Skip to content

[SPARK-57652][SS][TESTS] Deflake snapshotStartBatchId-with-transformWithState StateDataSource test#56721

Closed
HyukjinKwon wants to merge 1 commit into
apache:masterfrom
HyukjinKwon:SPARK-57652-tws-deflake
Closed

[SPARK-57652][SS][TESTS] Deflake snapshotStartBatchId-with-transformWithState StateDataSource test#56721
HyukjinKwon wants to merge 1 commit into
apache:masterfrom
HyukjinKwon:SPARK-57652-tws-deflake

Conversation

@HyukjinKwon

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Replace the fixed Thread.sleep(5000) in the "snapshotStartBatchId with transformWithState" test of StateDataSourceTransformWithStateSuite with a deterministic eventually(...) wait that polls until the RocksDB snapshot files the snapshotStartBatchId reader needs (snapshot version 2 for the partitions read) have actually been uploaded by the asynchronous maintenance thread.

The root cause is known, but to guard against regressions the timeout assertion now prints the actual state-directory contents, so a recurrence in scheduled jobs is immediately diagnosable (snapshot still pending vs. cleaned up vs. wrong path) rather than a bare failure.

Why are the changes needed?

The snapshot upload is asynchronous (background maintenance thread), so the fixed sleep is racy under CI load. When it is too short, the snapshot .zip is not yet uploaded and the reader fails on the scheduled Maven (Scala 2.13, JDK 21) and JDK 25 builds:

[CANNOT_LOAD_STATE_STORE.UNCATEGORIZED] An error occurred during loading state.
Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist

Does this PR introduce any user-facing change?

No. Test-only.

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.8

@dongjoon-hyun dongjoon-hyun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

// asynchronous, so a too-short sleep leaves `2.zip` missing -> FileNotFoundException).
Execute { _ =>
val opStateDir = new File(tmpDir, "state/0")
eventually(timeout(60.seconds), interval(100.milliseconds)) {

@dongjoon-hyun dongjoon-hyun Jun 24, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, technically, we increase the maximum timeout from 5s to 60s?

@HyukjinKwon

Copy link
Copy Markdown
Member Author

Self-review (code-review skill, high effort) — posted as a comment, not an approval.

Scope: 1 file, +21/-2 (test only). Reviewed for correctness, removed-behavior, cross-file, simplification, altitude, conventions.

Correctness ✅ (no bugs found)

  • Version is right: the wait polls for snapshot version 2 (2(_.*)?\.zip), which is exactly what the reader needs — snapshotStartBatchId = 1 ⇒ snapshot version 1 + 1 = 2, and matches the original failure path (.../state/0/1/2.zip).
  • Partitions are right: it waits on partitions 1 and 4, the same partitions the test reads below (snapshotPartitionId 1 and 4); operatorId = 0state/0. SHUFFLE_PARTITIONS = 5 so those partitions exist.
  • Null-safe: Option(partitionDir.listFiles()) handles the dir-not-yet-created case (returns Seq.empty, keeps polling) instead of NPE.
  • Regex is anchored: matches is whole-string, so 2(_.*)?\.zip matches 2.zip and 2_<uid>.zip (v1 and checkpoint-v2 naming) but not 12.zip/20.zip — no false positives across the suite subclasses.
  • No lost coverage vs the removed Thread.sleep: changelogs are uploaded synchronously at commit, so only the snapshot .zip needed the wait; the new wait covers exactly that, deterministically. Waiting immediately before StopStream also guarantees the file is present when state freezes (strictly better than the sleep).
  • Validated by running the test 10× in one sbt session (all green, linked in the description).

Minor (maintainability)

  • The snapshot version 2 is hardcoded to track the hardcoded snapshotStartBatchId = 1. If a future edit changes snapshotStartBatchId, this wait silently goes stale (times out / waits on the wrong file). Low risk since both are test-local constants a few lines apart, but a one-line comment tying them together wouldn't hurt.

Optional follow-up (altitude, out of scope here)

  • A deeper alternative would be to make the snapshotStartBatchId reader tolerant of a not-yet-uploaded snapshot at the product level, rather than each test waiting. That's a behavior change beyond a test deflake; flagging only as a possible future direction.

No blocking concerns.

@HyukjinKwon

Copy link
Copy Markdown
Member Author

⚠️ Hold merge — this fix is not robust; I'm pushing a correction.

A full Maven (Scala 2.13, JDK 21) integration run surfaced that the (encoding = avro) variant of this test still fails with the new wait:

snapshotStartBatchId with transformWithState (with changelog checkpointing) (encoding = avro) *** FAILED ***
The code passed to eventually never returned normally. Attempted 608 times over 1.0 minutes.
Last failure message: snapshotUploaded was false Snapshot (version 2) for partition 1 was not uploaded in time.

Root cause is deeper than my first patch assumed: the background maintenance thread snapshots the current version, so a version-2 .zip only ever gets created while version 2 is the current version (between the 2nd and 3rd batches). Waiting for it at the end (when version 5 is current) can never make it appear — so when maintenance didn't happen to snapshot v2 during processing, the wait just times out (a different flake, not a fix).

Correct fix: wait for the version-2 snapshot right after version 2 is committed (while it is still current), which deterministically forces maintenance to create it before more batches advance the version. I'll push that and re-validate all variants (unsaferow + avro) with multiple runs before this should be merged.

(My earlier validation only multi-ran the unsaferow variant — that was the gap.)

@HyukjinKwon HyukjinKwon marked this pull request as draft June 24, 2026 07:10
…ithState StateDataSource test

### What changes were proposed in this pull request?
Replace the fixed `Thread.sleep(5000)` in the "snapshotStartBatchId with transformWithState"
test of `StateDataSourceTransformWithStateSuite` with a deterministic wait, performed *while the
target state-store version is still the current version*, for the RocksDB snapshot `.zip` the
`snapshotStartBatchId` reader needs. A `waitForStateSnapshot(version, partitions)` helper polls
until the snapshot file exists; on timeout it prints the actual state-directory contents to make a
recurrence in scheduled jobs diagnosable.

### Why are the changes needed?
The snapshot upload is asynchronous, and the maintenance thread only ever snapshots the *current*
state-store version. The old code slept at the end (after the version had advanced to 5), so the
version-2 snapshot the reader needs could be missing and never re-created, failing the scheduled
Maven (Scala 2.13, JDK 21/25) builds:

    [CANNOT_LOAD_STATE_STORE.UNCATEGORIZED] ...
    Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist

Waiting while version 2 is current deterministically forces maintenance to create that snapshot
(default minBatchesToRetain=100 keeps it until the read).

### Does this PR introduce _any_ user-facing change?
No. Test-only.

### How was this patch tested?
Ran the test (both unsaferow and avro encodings) repeatedly on JDK 21; see PR description for CI links.

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8

Co-authored-by: Isaac
@HyukjinKwon HyukjinKwon force-pushed the SPARK-57652-tws-deflake branch from 7bad4f5 to 10e5a84 Compare June 24, 2026 10:07
@HyukjinKwon HyukjinKwon marked this pull request as ready for review June 24, 2026 10:07
@HyukjinKwon

Copy link
Copy Markdown
Member Author

Updated with a robust fix and re-validated; back to ready for review.

The previous patch was insufficient (the avro variant could still time out, as the integration run showed). The fix now waits for the version-2 snapshot while version 2 is the current version (right after the 2nd batch), via a waitForStateSnapshot(version, partitions) helper — this deterministically forces the maintenance thread to create the snapshot, since it only ever snapshots the current version.

Re-validated across both encodings, 8× (the gap last time was only multi-running unsaferow):
run — 8 consecutive sbt runs, each Tests: succeeded 2, failed 0 (both unsaferow and avro), 16/16 green, including the previously-flaky avro variant.

The on-timeout diagnostic (prints the state-dir contents) is retained for future debuggability.

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Jun 24, 2026
…t for tws deflake

Supersedes the earlier end-of-stream wait, which could still time out on the avro variant
(maintenance only snapshots the current version). Validated both encodings x8 (run 28081816565).
Matches standalone PR apache#56721.

Co-authored-by: Isaac
@HyukjinKwon

Copy link
Copy Markdown
Member Author

Merged to master and branch-4.x.

HyukjinKwon added a commit that referenced this pull request Jun 24, 2026
…thState StateDataSource test

### What changes were proposed in this pull request?
Replace the fixed `Thread.sleep(5000)` in the `"snapshotStartBatchId with transformWithState"` test of `StateDataSourceTransformWithStateSuite` with a deterministic `eventually(...)` wait that polls until the RocksDB snapshot files the `snapshotStartBatchId` reader needs (snapshot version 2 for the partitions read) have actually been uploaded by the asynchronous maintenance thread.

The root cause is known, but to guard against regressions the timeout assertion now prints the actual state-directory contents, so a recurrence in scheduled jobs is immediately diagnosable (snapshot still pending vs. cleaned up vs. wrong path) rather than a bare failure.

### Why are the changes needed?
The snapshot upload is asynchronous (background maintenance thread), so the fixed sleep is racy under CI load. When it is too short, the snapshot `.zip` is not yet uploaded and the reader fails on the scheduled **Maven (Scala 2.13, JDK 21)** and **JDK 25** builds:

```
[CANNOT_LOAD_STATE_STORE.UNCATEGORIZED] An error occurred during loading state.
Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist
```

### Does this PR introduce _any_ user-facing change?
No. Test-only.

### How was this patch tested?
- **Before (failing job):** [`Build / Maven (Scala 2.13, JDK 21)` → `sql#core - slow tests`](https://github.com/apache/spark/actions/runs/28048347820/job/83041196522) — `snapshotStartBatchId with transformWithState ... *** FAILED ***` (`FileNotFoundException ... 2.zip`).
- **After (passing, run 10x to confirm the flake is gone):** [✅ 10/10 passed](https://github.com/HyukjinKwon/spark/actions/runs/28074724227/job/83116481122) — the test was executed 10 consecutive times in one sbt session, all green.

### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.8

Closes #56721 from HyukjinKwon/SPARK-57652-tws-deflake.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
(cherry picked from commit 850a41e)
Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants