From 10e5a84edb6360120a4689dbb5e3176bfa3cbb11 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 24 Jun 2026 09:28:57 +0900 Subject: [PATCH] [SPARK-57652][SS][TESTS] Deflake snapshotStartBatchId-with-transformWithState StateDataSource test ### What changes were proposed in this pull request? Replace the fixed `Thread.sleep(5000)` in the "snapshotStartBatchId with transformWithState" test of `StateDataSourceTransformWithStateSuite` with a deterministic wait, performed *while the target state-store version is still the current version*, for the RocksDB snapshot `.zip` the `snapshotStartBatchId` reader needs. A `waitForStateSnapshot(version, partitions)` helper polls until the snapshot file exists; on timeout it prints the actual state-directory contents to make a recurrence in scheduled jobs diagnosable. ### Why are the changes needed? The snapshot upload is asynchronous, and the maintenance thread only ever snapshots the *current* state-store version. The old code slept at the end (after the version had advanced to 5), so the version-2 snapshot the reader needs could be missing and never re-created, failing the scheduled Maven (Scala 2.13, JDK 21/25) builds: [CANNOT_LOAD_STATE_STORE.UNCATEGORIZED] ... Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist Waiting while version 2 is current deterministically forces maintenance to create that snapshot (default minBatchesToRetain=100 keeps it until the read). ### Does this PR introduce _any_ user-facing change? No. Test-only. ### How was this patch tested? Ran the test (both unsaferow and avro encodings) repeatedly on JDK 21; see PR description for CI links. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Opus 4.8 Co-authored-by: Isaac --- ...ateDataSourceTransformWithStateSuite.scala | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala index d7f28b79acff4..f861388605e0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala @@ -20,6 +20,7 @@ import java.io.File import java.time.Duration import org.apache.hadoop.conf.Configuration +import org.scalatest.time.SpanSugar._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.{Encoders, Row} @@ -1050,20 +1051,47 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest .transformWithState(new AggregationStatefulProcessor(), TimeMode.None(), OutputMode.Append()) + + // Block until the async maintenance thread has written the RocksDB snapshot `.zip` for the + // given state-store `version` (matching both `.zip` and checkpoint-v2 `_.zip`) + // on each of `partitions`. Used right after `version` is committed (while it is the current + // version), since maintenance only ever snapshots the current version. On timeout it prints + // the actual directory contents so a recurrence in scheduled jobs is diagnosable. + def waitForStateSnapshot(version: Long, partitions: Seq[Int]): StreamAction = Execute { _ => + val opStateDir = new File(tmpDir, "state/0") + eventually(timeout(60.seconds), interval(100.milliseconds)) { + partitions.foreach { partition => + val partitionDir = new File(opStateDir, partition.toString) + val files = Option(partitionDir.listFiles()) + .map(_.map(_.getName).sorted.toSeq).getOrElse(Seq.empty) + val snapshotUploaded = files.exists(_.matches(s"$version(_.*)?\\.zip")) + assert(snapshotUploaded, + s"Snapshot (version $version) for partition $partition was not uploaded in time. " + + s"Contents of $partitionDir: ${files.mkString("[", ", ", "]")}") + } + } + } + testStream(query)( StartStream(checkpointLocation = tmpDir.getCanonicalPath), AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L)), ProcessAllAvailable(), AddData(inputData, (5, 1L), (6, 2L), (7, 3L), (8, 4L)), ProcessAllAvailable(), + // State version 2 is now the current version. The snapshotStartBatchId=1 reader below + // needs the version-2 snapshot, and the asynchronous maintenance thread only creates a + // snapshot for the *current* version. So block here, while version 2 is still current, + // until that snapshot has actually been written - this deterministically forces it to + // exist. (A fixed sleep at the *end* is flaky: once later batches advance the current + // version, maintenance never goes back to snapshot version 2, so it may never appear and + // the reader fails with FileNotFoundException on `2.zip`.) + waitForStateSnapshot(version = 2, partitions = Seq(1, 4)), AddData(inputData, (9, 1L), (10, 2L), (11, 3L), (12, 4L)), ProcessAllAvailable(), AddData(inputData, (13, 1L), (14, 2L), (15, 3L), (16, 4L)), ProcessAllAvailable(), AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)), ProcessAllAvailable(), - // Ensure that we get a chance to upload created snapshots - Execute { _ => Thread.sleep(5000) }, StopStream ) }