Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.File
import java.time.Duration

import org.apache.hadoop.conf.Configuration
import org.scalatest.time.SpanSugar._

import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.{Encoders, Row}
Expand Down Expand Up @@ -1050,20 +1051,47 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest
.transformWithState(new AggregationStatefulProcessor(),
TimeMode.None(),
OutputMode.Append())

// Block until the async maintenance thread has written the RocksDB snapshot `.zip` for the
// given state-store `version` (matching both `<v>.zip` and checkpoint-v2 `<v>_<uid>.zip`)
// on each of `partitions`. Used right after `version` is committed (while it is the current
// version), since maintenance only ever snapshots the current version. On timeout it prints
// the actual directory contents so a recurrence in scheduled jobs is diagnosable.
def waitForStateSnapshot(version: Long, partitions: Seq[Int]): StreamAction = Execute { _ =>
val opStateDir = new File(tmpDir, "state/0")
eventually(timeout(60.seconds), interval(100.milliseconds)) {
partitions.foreach { partition =>
val partitionDir = new File(opStateDir, partition.toString)
val files = Option(partitionDir.listFiles())
.map(_.map(_.getName).sorted.toSeq).getOrElse(Seq.empty)
val snapshotUploaded = files.exists(_.matches(s"$version(_.*)?\\.zip"))
assert(snapshotUploaded,
s"Snapshot (version $version) for partition $partition was not uploaded in time. " +
s"Contents of $partitionDir: ${files.mkString("[", ", ", "]")}")
}
}
}

testStream(query)(
StartStream(checkpointLocation = tmpDir.getCanonicalPath),
AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L)),
ProcessAllAvailable(),
AddData(inputData, (5, 1L), (6, 2L), (7, 3L), (8, 4L)),
ProcessAllAvailable(),
// State version 2 is now the current version. The snapshotStartBatchId=1 reader below
// needs the version-2 snapshot, and the asynchronous maintenance thread only creates a
// snapshot for the *current* version. So block here, while version 2 is still current,
// until that snapshot has actually been written - this deterministically forces it to
// exist. (A fixed sleep at the *end* is flaky: once later batches advance the current
// version, maintenance never goes back to snapshot version 2, so it may never appear and
// the reader fails with FileNotFoundException on `2.zip`.)
waitForStateSnapshot(version = 2, partitions = Seq(1, 4)),
AddData(inputData, (9, 1L), (10, 2L), (11, 3L), (12, 4L)),
ProcessAllAvailable(),
AddData(inputData, (13, 1L), (14, 2L), (15, 3L), (16, 4L)),
ProcessAllAvailable(),
AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)),
ProcessAllAvailable(),
// Ensure that we get a chance to upload created snapshots
Execute { _ => Thread.sleep(5000) },
StopStream
)
}
Expand Down