diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala index d7f28b79acff4..f861388605e0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala @@ -20,6 +20,7 @@ import java.io.File import java.time.Duration import org.apache.hadoop.conf.Configuration +import org.scalatest.time.SpanSugar._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.{Encoders, Row} @@ -1050,20 +1051,47 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest .transformWithState(new AggregationStatefulProcessor(), TimeMode.None(), OutputMode.Append()) + + // Block until the async maintenance thread has written the RocksDB snapshot `.zip` for the + // given state-store `version` (matching both `.zip` and checkpoint-v2 `_.zip`) + // on each of `partitions`. Used right after `version` is committed (while it is the current + // version), since maintenance only ever snapshots the current version. On timeout it prints + // the actual directory contents so a recurrence in scheduled jobs is diagnosable. + def waitForStateSnapshot(version: Long, partitions: Seq[Int]): StreamAction = Execute { _ => + val opStateDir = new File(tmpDir, "state/0") + eventually(timeout(60.seconds), interval(100.milliseconds)) { + partitions.foreach { partition => + val partitionDir = new File(opStateDir, partition.toString) + val files = Option(partitionDir.listFiles()) + .map(_.map(_.getName).sorted.toSeq).getOrElse(Seq.empty) + val snapshotUploaded = files.exists(_.matches(s"$version(_.*)?\\.zip")) + assert(snapshotUploaded, + s"Snapshot (version $version) for partition $partition was not uploaded in time. " + + s"Contents of $partitionDir: ${files.mkString("[", ", ", "]")}") + } + } + } + testStream(query)( StartStream(checkpointLocation = tmpDir.getCanonicalPath), AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L)), ProcessAllAvailable(), AddData(inputData, (5, 1L), (6, 2L), (7, 3L), (8, 4L)), ProcessAllAvailable(), + // State version 2 is now the current version. The snapshotStartBatchId=1 reader below + // needs the version-2 snapshot, and the asynchronous maintenance thread only creates a + // snapshot for the *current* version. So block here, while version 2 is still current, + // until that snapshot has actually been written - this deterministically forces it to + // exist. (A fixed sleep at the *end* is flaky: once later batches advance the current + // version, maintenance never goes back to snapshot version 2, so it may never appear and + // the reader fails with FileNotFoundException on `2.zip`.) + waitForStateSnapshot(version = 2, partitions = Seq(1, 4)), AddData(inputData, (9, 1L), (10, 2L), (11, 3L), (12, 4L)), ProcessAllAvailable(), AddData(inputData, (13, 1L), (14, 2L), (15, 3L), (16, 4L)), ProcessAllAvailable(), AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)), ProcessAllAvailable(), - // Ensure that we get a chance to upload created snapshots - Execute { _ => Thread.sleep(5000) }, StopStream ) }