[SPARK-57652][SS][TESTS] Deflake snapshotStartBatchId-with-transformWithState StateDataSource test#56721
[SPARK-57652][SS][TESTS] Deflake snapshotStartBatchId-with-transformWithState StateDataSource test#56721HyukjinKwon wants to merge 1 commit into
Conversation
| // asynchronous, so a too-short sleep leaves `2.zip` missing -> FileNotFoundException). | ||
| Execute { _ => | ||
| val opStateDir = new File(tmpDir, "state/0") | ||
| eventually(timeout(60.seconds), interval(100.milliseconds)) { |
There was a problem hiding this comment.
So, technically, we increase the maximum timeout from 5s to 60s?
|
Self-review (code-review skill, high effort) — posted as a comment, not an approval. Scope: 1 file, +21/-2 (test only). Reviewed for correctness, removed-behavior, cross-file, simplification, altitude, conventions. Correctness ✅ (no bugs found)
Minor (maintainability)
Optional follow-up (altitude, out of scope here)
No blocking concerns. |
|
A full Maven (Scala 2.13, JDK 21) integration run surfaced that the Root cause is deeper than my first patch assumed: the background maintenance thread snapshots the current version, so a version-2 Correct fix: wait for the version-2 snapshot right after version 2 is committed (while it is still current), which deterministically forces maintenance to create it before more batches advance the version. I'll push that and re-validate all variants (unsaferow + avro) with multiple runs before this should be merged. (My earlier validation only multi-ran the |
…ithState StateDataSource test
### What changes were proposed in this pull request?
Replace the fixed `Thread.sleep(5000)` in the "snapshotStartBatchId with transformWithState"
test of `StateDataSourceTransformWithStateSuite` with a deterministic wait, performed *while the
target state-store version is still the current version*, for the RocksDB snapshot `.zip` the
`snapshotStartBatchId` reader needs. A `waitForStateSnapshot(version, partitions)` helper polls
until the snapshot file exists; on timeout it prints the actual state-directory contents to make a
recurrence in scheduled jobs diagnosable.
### Why are the changes needed?
The snapshot upload is asynchronous, and the maintenance thread only ever snapshots the *current*
state-store version. The old code slept at the end (after the version had advanced to 5), so the
version-2 snapshot the reader needs could be missing and never re-created, failing the scheduled
Maven (Scala 2.13, JDK 21/25) builds:
[CANNOT_LOAD_STATE_STORE.UNCATEGORIZED] ...
Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist
Waiting while version 2 is current deterministically forces maintenance to create that snapshot
(default minBatchesToRetain=100 keeps it until the read).
### Does this PR introduce _any_ user-facing change?
No. Test-only.
### How was this patch tested?
Ran the test (both unsaferow and avro encodings) repeatedly on JDK 21; see PR description for CI links.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8
Co-authored-by: Isaac
7bad4f5 to
10e5a84
Compare
|
✅ Updated with a robust fix and re-validated; back to ready for review. The previous patch was insufficient (the Re-validated across both encodings, 8× (the gap last time was only multi-running The on-timeout diagnostic (prints the state-dir contents) is retained for future debuggability. |
…t for tws deflake Supersedes the earlier end-of-stream wait, which could still time out on the avro variant (maintenance only snapshots the current version). Validated both encodings x8 (run 28081816565). Matches standalone PR apache#56721. Co-authored-by: Isaac
|
Merged to master and branch-4.x. |
…thState StateDataSource test ### What changes were proposed in this pull request? Replace the fixed `Thread.sleep(5000)` in the `"snapshotStartBatchId with transformWithState"` test of `StateDataSourceTransformWithStateSuite` with a deterministic `eventually(...)` wait that polls until the RocksDB snapshot files the `snapshotStartBatchId` reader needs (snapshot version 2 for the partitions read) have actually been uploaded by the asynchronous maintenance thread. The root cause is known, but to guard against regressions the timeout assertion now prints the actual state-directory contents, so a recurrence in scheduled jobs is immediately diagnosable (snapshot still pending vs. cleaned up vs. wrong path) rather than a bare failure. ### Why are the changes needed? The snapshot upload is asynchronous (background maintenance thread), so the fixed sleep is racy under CI load. When it is too short, the snapshot `.zip` is not yet uploaded and the reader fails on the scheduled **Maven (Scala 2.13, JDK 21)** and **JDK 25** builds: ``` [CANNOT_LOAD_STATE_STORE.UNCATEGORIZED] An error occurred during loading state. Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist ``` ### Does this PR introduce _any_ user-facing change? No. Test-only. ### How was this patch tested? - **Before (failing job):** [`Build / Maven (Scala 2.13, JDK 21)` → `sql#core - slow tests`](https://github.com/apache/spark/actions/runs/28048347820/job/83041196522) — `snapshotStartBatchId with transformWithState ... *** FAILED ***` (`FileNotFoundException ... 2.zip`). - **After (passing, run 10x to confirm the flake is gone):** [✅ 10/10 passed](https://github.com/HyukjinKwon/spark/actions/runs/28074724227/job/83116481122) — the test was executed 10 consecutive times in one sbt session, all green. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.8 Closes #56721 from HyukjinKwon/SPARK-57652-tws-deflake. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com> (cherry picked from commit 850a41e) Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
What changes were proposed in this pull request?
Replace the fixed
Thread.sleep(5000)in the"snapshotStartBatchId with transformWithState"test ofStateDataSourceTransformWithStateSuitewith a deterministiceventually(...)wait that polls until the RocksDB snapshot files thesnapshotStartBatchIdreader needs (snapshot version 2 for the partitions read) have actually been uploaded by the asynchronous maintenance thread.The root cause is known, but to guard against regressions the timeout assertion now prints the actual state-directory contents, so a recurrence in scheduled jobs is immediately diagnosable (snapshot still pending vs. cleaned up vs. wrong path) rather than a bare failure.
Why are the changes needed?
The snapshot upload is asynchronous (background maintenance thread), so the fixed sleep is racy under CI load. When it is too short, the snapshot
.zipis not yet uploaded and the reader fails on the scheduled Maven (Scala 2.13, JDK 21) and JDK 25 builds:Does this PR introduce any user-facing change?
No. Test-only.
How was this patch tested?
Build / Maven (Scala 2.13, JDK 21)→sql#core - slow tests—snapshotStartBatchId with transformWithState ... *** FAILED ***(FileNotFoundException ... 2.zip).Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8