From 67dd8a08286b4436b55f37f8d497561f7af05f35 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Mon, 8 Jun 2026 20:42:56 -0700 Subject: [PATCH 1/3] [SPARK-56970][SS] Split CommitMetadata into CommitMetadataBase + V1/V2 case classes Backport of [SPARK-56970] (apache/spark#56018) to `branch-4.x`. Refactor `CommitLog` so that the commit log metadata is dispatched through a `CommitMetadataBase` trait with concrete `CommitMetadata` (V1, watermark only) and `CommitMetadataV2` (watermark + `stateUniqueIds`) case classes. The deserializer now reads the wire-format version from the file header and constructs the matching subclass. This is preparation for `CommitMetadataV3` (which adds sink metadata for streaming sink evolution) in a follow-up. Notable changes: - Add `CommitMetadataBase` trait and `CommitMetadataV2` case class. - `CommitMetadata` becomes V1 (no `stateUniqueIds` field). - Add `CommitLog.createMetadata` factory that dispatches by version and defaults to the configured `STATE_STORE_CHECKPOINT_FORMAT_VERSION`. - `CommitLog.readCommitMetadata` reads the version line and constructs the matching subclass. - `MicroBatchExecution`, `OfflineStateRepartitionRunner`, and the existing tests are updated to use the new types / factory. The pre-refactor `CommitMetadata` carried both the V1 and V2 wire shape in a single case class, with `stateUniqueIds` optional. That made it awkward to add a V3 wire format with additional fields, and forced `serialize` to take the wire version from `SQLConf` rather than from the metadata itself. No new public API. The wire format for V1 changes slightly: V1 commit log files no longer serialize `stateUniqueIds: null`. Old V1 files continue to be read because the V1 deserializer ignores the (now-unknown) field. This PR also relaxes the version-exact-match check on read so that a commit log opened with the V2 conf can deserialize a V1 file. This incidentally resolves SPARK-50653. - Existing `CommitLogSuite` (V1, V2, and cross-version); the cross-version test now asserts successful V1 deserialization. - `sql/core` main and test sources compile cleanly on `branch-4.x` (`build/sbt sql/Test/compile`). Generated-by: Claude Code (claude-opus-4-7) Closes #56307 from ericm-db/SPARK-56970-branch-4.x. Authored-by: Eric Marnadi Signed-off-by: Wenchen Fan --- .../checkpointing/AsyncCommitLog.scala | 4 +- .../streaming/checkpointing/CommitLog.scala | 131 +++++++++++++++--- .../runtime/MicroBatchExecution.scala | 6 +- .../state/OfflineStateRepartitionRunner.scala | 4 +- .../streaming/state/StateRewriter.scala | 34 ++--- .../StateDataSourceChangeDataReadSuite.scala | 6 +- .../v2/state/StateDataSourceReadSuite.scala | 77 ++++++---- ...artitionAllColumnFamiliesWriterSuite.scala | 2 +- .../spark/sql/streaming/CommitLogSuite.scala | 53 +++++-- 9 files changed, 222 insertions(+), 95 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala index 116ea18326ef0..0f031fcbb9512 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/AsyncCommitLog.scala @@ -48,7 +48,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: * the async write of the batch is completed. Future may also be completed exceptionally * to indicate some write error. */ - def addAsync(batchId: Long, metadata: CommitMetadata): CompletableFuture[Long] = { + def addAsync(batchId: Long, metadata: CommitMetadataBase): CompletableFuture[Long] = { require(metadata != null, "'null' metadata cannot be written to a metadata log") val future: CompletableFuture[Long] = addNewBatchByStreamAsync(batchId) { output => serialize(metadata, output) @@ -72,7 +72,7 @@ class AsyncCommitLog(sparkSession: SparkSession, path: String, executorService: * @param metadata metadata of batch to write * @return true if operation is successful otherwise false. */ - def addInMemory(batchId: Long, metadata: CommitMetadata): Boolean = { + def addInMemory(batchId: Long, metadata: CommitMetadataBase): Boolean = { if (batchCache.containsKey(batchId)) { false } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala index b73020b6060c6..820aecf70d0ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala @@ -26,6 +26,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf /** @@ -50,39 +51,119 @@ class CommitLog( sparkSession: SparkSession, path: String, readOnly: Boolean = false) - extends HDFSMetadataLog[CommitMetadata](sparkSession, path, readOnly) { + extends HDFSMetadataLog[CommitMetadataBase](sparkSession, path, readOnly) { import CommitLog._ - private val VERSION: Int = sparkSession.conf.get( + // The configured commit log format version. Used as the default version when callers + // construct metadata through [[createMetadata]]. + private[sql] val defaultVersion: Int = sparkSession.conf.get( SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key).toInt - override protected[sql] def deserialize(in: InputStream): CommitMetadata = { - // called inside a try-finally where the underlying stream is closed in the caller - val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() - if (!lines.hasNext) { - throw new IllegalStateException("Incomplete log file in the offset commit log") - } - // TODO [SPARK-49462] This validation should be relaxed for a stateless query. - // TODO [SPARK-50653] This validation should be relaxed to support reading - // a V1 log file when VERSION is V2 - validateVersionExactMatch(lines.next().trim, VERSION) - val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON - CommitMetadata(metadataJson) + override protected[sql] def deserialize(in: InputStream): CommitMetadataBase = { + CommitLog.readCommitMetadata(in) } - override protected[sql] def serialize(metadata: CommitMetadata, out: OutputStream): Unit = { + override protected[sql] def serialize(metadata: CommitMetadataBase, out: OutputStream): Unit = { // called inside a try-finally where the underlying stream is closed in the caller - out.write(s"v${VERSION}".getBytes(UTF_8)) + out.write(s"v${metadata.version}".getBytes(UTF_8)) out.write('\n') // write metadata out.write(metadata.json.getBytes(UTF_8)) } + + /** + * Factory for creating a [[CommitMetadataBase]] for the requested wire format version. + * Defaults to the version configured via [[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]]. + */ + def createMetadata( + nextBatchWatermarkMs: Long = 0, + stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None, + commitLogFormatVersion: Int = defaultVersion): CommitMetadataBase = { + commitLogFormatVersion match { + case VERSION_2 => + CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds) + case VERSION_1 => + // VERSION_1 cannot persist stateUniqueIds; withStateUniqueIds enforces this invariant + // (it throws if stateUniqueIds is non-empty). + CommitMetadata(nextBatchWatermarkMs).withStateUniqueIds(stateUniqueIds) + case v => + throw QueryExecutionErrors.logVersionGreaterThanSupported(v, CommitLog.MAX_VERSION) + } + } } object CommitLog { private val EMPTY_JSON = "{}" + val VERSION_1 = 1 + val VERSION_2 = 2 + val MAX_VERSION: Int = VERSION_2 + + /** + * Reads a single commit log entry and dispatches to the matching + * [[CommitMetadataBase]] subclass based on the wire format version recorded in the file. + */ + private[spark] def readCommitMetadata(in: InputStream): CommitMetadataBase = { + val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines() + if (!lines.hasNext) { + throw new IllegalStateException("Incomplete log file in the offset commit log") + } + val version = MetadataVersionUtil.validateVersion(lines.next().trim, MAX_VERSION) + val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON + version match { + case VERSION_2 => CommitMetadataV2(metadataJson) + case VERSION_1 => CommitMetadata(metadataJson) + case v => throw QueryExecutionErrors.logVersionGreaterThanSupported(v, MAX_VERSION) + } + } +} + +/** + * Base trait for commit log metadata. Concrete subclasses correspond to wire format versions + * and override [[version]] accordingly. + */ +trait CommitMetadataBase extends Serializable { + def version: Int + def nextBatchWatermarkMs: Long + def stateUniqueIds: Option[Map[Long, Array[Array[String]]]] + + /** + * Returns a copy of this metadata with the given state store unique ids, preserving the + * concrete subclass and all of its other fields. Deriving a new commit from an existing one + * should go through this method (rather than reconstructing via [[CommitLog.createMetadata]]) + * so that version-specific fields are not silently dropped when new metadata versions are + * introduced. + */ + def withStateUniqueIds( + stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataBase + + def json: String = Serialization.write(this)(CommitMetadata.format) +} + +/** + * Commit log metadata for [[CommitLog.VERSION_1]]. Records the watermark for the next batch only. + * + * @param nextBatchWatermarkMs The watermark of the next batch. + */ +case class CommitMetadata( + nextBatchWatermarkMs: Long = 0) extends CommitMetadataBase { + override def version: Int = CommitLog.VERSION_1 + override def stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None + + override def withStateUniqueIds( + stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadata = { + require(stateUniqueIds.forall(_.isEmpty), + s"stateUniqueIds cannot be set for commit log format version ${CommitLog.VERSION_1}; " + + s"use version ${CommitLog.VERSION_2} to persist state store checkpoint ids.") + this + } +} + +object CommitMetadata { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) } /** @@ -104,19 +185,23 @@ object CommitLog { * +--- ...... * In the commit log, in addition to nextBatchWatermarkMs, we also store the unique ids of the * state store files. + * * @param nextBatchWatermarkMs The watermark of the next batch. * @param stateUniqueIds Map[Long, Array[Array[String]]] of map * OperatorId -> (partitionID -> array of uniqueID) */ - -case class CommitMetadata( +case class CommitMetadataV2( nextBatchWatermarkMs: Long = 0, - stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None) { - def json: String = Serialization.write(this)(CommitMetadata.format) + stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None) extends CommitMetadataBase { + override def version: Int = CommitLog.VERSION_2 + + override def withStateUniqueIds( + stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataV2 = + copy(stateUniqueIds = stateUniqueIds) } -object CommitMetadata { - implicit val format: Formats = Serialization.formats(NoTypeHints) +object CommitMetadataV2 { + import CommitMetadata.format - def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json) + def apply(json: String): CommitMetadataV2 = Serialization.read[CommitMetadataV2](json) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 84f0373ca5d48..94143799a8c41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitMetadata, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -1464,7 +1464,9 @@ class MicroBatchExecution( None } if (!commitLog.add(execCtx.batchId, - CommitMetadata(watermarkTracker.currentWatermark, stateStoreCkptId))) { + commitLog.createMetadata( + nextBatchWatermarkMs = watermarkTracker.currentWatermark, + stateUniqueIds = stateStoreCkptId))) { throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala index 1491d26989062..dc13fa1030a0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OfflineStateRepartitionRunner.scala @@ -294,7 +294,9 @@ class OfflineStateRepartitionRunner( lastCommittedBatchId: Long, opIdToStateStoreCkptInfo: Option[Map[Long, Array[Array[String]]]]): Unit = { val latestCommit = checkpointMetadata.commitLog.get(lastCommittedBatchId).get - val commitMetadata = latestCommit.copy(stateUniqueIds = opIdToStateStoreCkptInfo) + // Derive the new commit from the latest one so version-specific fields are preserved and the + // wire format version stays consistent with the source checkpoint. + val commitMetadata = latestCommit.withStateUniqueIds(opIdToStateStoreCkptInfo) if (!checkpointMetadata.commitLog.add(newBatchId, commitMetadata)) { throw QueryExecutionErrors.concurrentStreamLogUpdate(newBatchId) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala index fd890161caafd..546a9a6019647 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateRewriter.scala @@ -22,7 +22,7 @@ import java.util.UUID import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkIllegalStateException, SparkThrowable, TaskContext} +import org.apache.spark.{SparkIllegalStateException, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys._ @@ -376,27 +376,19 @@ class StateRewriter( } private def verifyCheckpointFormatVersion(): Unit = { - // Verify checkpoint version in sqlConf based on commitLog for readCheckpoint - // in case user forgot to set STATE_STORE_CHECKPOINT_FORMAT_VERSION. - // Using read batch commit since the latest commit could be a skipped batch. - // If SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION is wrong, readCheckpoint.commitLog - // will throw an exception, and we will propagate this exception upstream. - // This prevents the StateRewriter from failing to write the correct state files - try { - readCheckpoint.commitLog.get(readBatchId) - } catch { - case e: IllegalStateException if e.getCause != null && - e.getCause.isInstanceOf[SparkThrowable] => - val sparkThrowable = e.getCause.asInstanceOf[SparkThrowable] - if (sparkThrowable.getCondition == "INVALID_LOG_VERSION.EXACT_MATCH_VERSION") { - val params = sparkThrowable.getMessageParameters - val expectedVersion = params.get("version") - val actualVersion = params.get("matchVersion") - throw StateRewriterErrors.stateCheckpointFormatVersionMismatchError( - checkpointLocationForRead, expectedVersion, actualVersion) - } - throw e + // Verify checkpoint version in sqlConf matches the version recorded in the read commit log, + // in case the user forgot to set STATE_STORE_CHECKPOINT_FORMAT_VERSION. This prevents the + // StateRewriter from writing state files in a format that disagrees with the source + // checkpoint. Using the read batch commit since the latest commit could be a skipped batch. + readCheckpoint.commitLog.get(readBatchId).foreach { metadata => + val configuredVersion = readCheckpoint.commitLog.defaultVersion + if (metadata.version != configuredVersion) { + throw StateRewriterErrors.stateCheckpointFormatVersionMismatchError( + checkpointLocationForRead, + expectedVersion = metadata.version.toString, + actualVersion = configuredVersion.toString) } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala index bae78f0b4762f..4e9f6cca2ffc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceChangeDataReadSuite.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataV2} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream, StreamExecution} import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions.{col, window} @@ -237,11 +237,11 @@ abstract class StateDataSourceChangeDataReaderSuite extends StateDataSourceTestB new File(tempDir.getAbsolutePath, "commits").getAbsolutePath) // Start version: treated as v1 (no operator unique ids) - val startMetadata = CommitMetadata(0, None) + val startMetadata = CommitMetadata(0) assert(commitLog.add(0, startMetadata)) // End version: treated as v2 (operator 0 has unique ids) - val endMetadata = CommitMetadata(0, + val endMetadata = CommitMetadataV2(0, Some(Map[Long, Array[Array[String]]](0L -> Array(Array("uid"))))) assert(commitLog.add(1, endMetadata)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala index 2def79828fac1..4a2a454077a7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala @@ -23,7 +23,7 @@ import java.util.UUID import org.apache.hadoop.conf.Configuration import org.scalatest.Assertions -import org.apache.spark.{SparkException, SparkThrowable, SparkUnsupportedOperationException} +import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.{AnalysisException, DataFrame, Encoders, Row} import org.apache.spark.sql.catalyst.expressions.{BoundReference, GenericInternalRow} @@ -589,8 +589,6 @@ class RocksDBWithCheckpointV2StateDataSourceReaderSuite extends StateDataSourceR override protected def newStateStoreProvider(): RocksDBStateStoreProvider = new RocksDBStateStoreProvider - import testImplicits._ - override def beforeAll(): Unit = { super.beforeAll() spark.conf.set(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION, 2) @@ -600,34 +598,57 @@ class RocksDBWithCheckpointV2StateDataSourceReaderSuite extends StateDataSourceR "true") } - // TODO: Remove this test once we allow migrations from checkpoint v1 to v2 - test("reading checkpoint v2 store with version 1 should fail") { - withTempDir { tmpDir => - val inputData = MemoryStream[(Int, Long)] - val query = getStreamStreamJoinQuery(inputData) - testStream(query)( - StartStream(checkpointLocation = tmpDir.getCanonicalPath), - AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)), - ProcessAllAvailable(), - Execute { _ => Thread.sleep(2000) }, - StopStream - ) + // Expected state after runLargeDataStreamingAggregationQuery, read from batch 2 / operator 0. + private val expectedLargeAggregationState: Seq[Row] = Seq( + Row(0, 5, 60, 30, 0), Row(1, 5, 65, 31, 1), Row(2, 5, 70, 32, 2), + Row(3, 4, 72, 33, 3), Row(4, 4, 76, 34, 4), Row(5, 4, 80, 35, 5), + Row(6, 4, 84, 36, 6), Row(7, 4, 88, 37, 7), Row(8, 4, 92, 38, 8), + Row(9, 4, 96, 39, 9)) + + private def readLargeAggregationState(checkpointDir: String): DataFrame = + spark.read.format("statestore") + .option(StateSourceOptions.PATH, checkpointDir) + .option(StateSourceOptions.BATCH_ID, 2) + .option(StateSourceOptions.OPERATOR_ID, 0) + .load() + .selectExpr("key.groupKey AS key_groupKey", "value.count AS value_cnt", + "value.sum AS value_sum", "value.max AS value_max", "value.min AS value_min") + // SPARK-56970: The commit log wire format version is now discovered from the file header + // rather than required to match STATE_STORE_CHECKPOINT_FORMAT_VERSION. As a result a V1 commit + // log can be read under a V2-configured session (and vice versa). Note this only applies to the + // commit log layer; reading a V2 state store still requires version 2 to be configured because + // the state store files are named with checkpoint unique ids. + test("SPARK-56970: reading a v1 checkpoint with commit log version 2 configured succeeds") { + withTempDir { tempDir => + // Override the suite default to write a V1 checkpoint (no checkpoint unique ids). withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "1") { - // Verify reading state throws error when reading checkpoint v2 with version 1 - val exc = intercept[IllegalStateException] { - val stateDf = spark.read.format("statestore") - .option(StateSourceOptions.BATCH_ID, 0) - .option(StateSourceOptions.OPERATOR_ID, 0) - .load(tmpDir.getCanonicalPath) - stateDf.collect() - } + runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath) + } + + // The suite default reads with version 2 configured; the V1 commit log must still be read. + checkAnswer( + readLargeAggregationState(tempDir.getAbsolutePath), expectedLargeAggregationState) + } + } - checkError(exc.getCause.asInstanceOf[SparkThrowable], - "INVALID_LOG_VERSION.EXACT_MATCH_VERSION", "KD002", - Map( - "version" -> "2", - "matchVersion" -> "1")) + test("SPARK-56970: reading a v2 checkpoint with commit log version 1 configured fails on the " + + "state store, not the commit log") { + withTempDir { tempDir => + // The suite configures commit log format version 2, so this writes a V2 checkpoint whose + // state store files are named with checkpoint unique ids. + runLargeDataStreamingAggregationQuery(tempDir.getAbsolutePath) + + withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "1") { + // The commit log now deserializes across versions, so this no longer fails with + // INVALID_LOG_VERSION at the commit-log layer. Reading the V2 state store itself still + // requires version 2 to be configured: with version 1 the reader looks for non-unique + // state file names and cannot locate the unique-id-named files. + val ex = intercept[SparkException] { + readLargeAggregationState(tempDir.getAbsolutePath).collect() + } + assert(ex.getMessage.contains("CANNOT_LOAD_STATE_STORE") || + Option(ex.getCause).map(_.getMessage).exists(_.contains("CANNOT_LOAD_STATE_STORE"))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala index be7874e806cd8..22d0af0a77fd6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatePartitionAllColumnFamiliesWriterSuite.scala @@ -99,7 +99,7 @@ class StatePartitionAllColumnFamiliesWriterSuite extends StateDataSourceTestBase // Commit to commitLog with checkpoint IDs val latestCommit = targetCheckpointMetadata.commitLog.get(lastBatch).get - val commitMetadata = latestCommit.copy(stateUniqueIds = checkpointInfos) + val commitMetadata = latestCommit.withStateUniqueIds(checkpointInfos) targetCheckpointMetadata.commitLog.add(writeBatchId, commitMetadata) val versionToCheck = writeBatchId + 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala index aa5826572240f..e5c81924e16bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, FileInputStream, FileOutputStream} import java.nio.file.Path import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata} +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -63,7 +63,7 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { ) } - private def testSerde(commitMetadata: CommitMetadata, path: Path): Unit = { + private def testSerde(commitMetadata: CommitMetadataBase, path: Path): Unit = { if (regenerateGoldenFiles) { val commitLog = new CommitLog(spark, path.toString) val outputStream = new FileOutputStream(path.resolve("testCommitLog").toFile) @@ -103,19 +103,21 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { 0L -> Array(Array("unique_id1", "unique_id2"), Array("unique_id3", "unique_id4")), 1L -> Array(Array("unique_id5", "unique_id6"), Array("unique_id7", "unique_id8")) ) - val testMetadataV2 = CommitMetadata(0, Some(testStateUniqueIds)) + val testMetadataV2 = CommitMetadataV2(0, Some(testStateUniqueIds)) testSerde(testMetadataV2, testCommitLogV2FilePath) } } test("Basic Commit Log V2 SerDe - empty stateUniqueIds") { withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") { - val testMetadataV2 = CommitMetadata(0, Some(Map[Long, Array[Array[String]]]())) + val testMetadataV2 = CommitMetadataV2(0, Some(Map[Long, Array[Array[String]]]())) testSerde(testMetadataV2, testCommitLogV2FilePathEmptyUniqueId) } } - // Old metadata structure with no state unique ids should not affect the deserialization + // SPARK-50653: When the configured commit log version is V2, a V1 file on disk should still + // deserialize successfully into a V1 [[CommitMetadata]] because the wire format version is now + // discovered from the file header rather than enforced to match the conf. test("Cross-version V1 SerDe") { withSQLConf(SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION.key -> "2") { val commitlogV1 = """v1 @@ -123,18 +125,41 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { val inputStream: ByteArrayInputStream = new ByteArrayInputStream(commitlogV1.getBytes("UTF-8")) - // TODO [SPARK-50653]: Uncomment the below when v2 -> v1 backward compatibility is added - // val commitMetadata: CommitMetadata = new CommitLog( - // spark, testCommitLogV1FilePath.toString).deserialize(inputStream) - // assert(commitMetadata.nextBatchWatermarkMs === 233) - // assert(commitMetadata.stateUniqueIds === Map.empty) + val commitMetadata = new CommitLog( + spark, testCommitLogV1FilePath.toString).deserialize(inputStream) + assert(commitMetadata.version === CommitLog.VERSION_1) + assert(commitMetadata.nextBatchWatermarkMs === 233) + assert(commitMetadata.stateUniqueIds.isEmpty) + } + } + + test("SPARK-56970: creating a V1 commit with stateUniqueIds should fail") { + withTempDir { tmpDir => + val commitLog = new CommitLog(spark, tmpDir.getCanonicalPath) + val stateUniqueIds: Map[Long, Array[Array[String]]] = + Map(0L -> Array(Array("unique_id1", "unique_id2"))) + + // Through the createMetadata factory with an explicit V1 format version. + val e1 = intercept[IllegalArgumentException] { + commitLog.createMetadata( + nextBatchWatermarkMs = 1, + stateUniqueIds = Some(stateUniqueIds), + commitLogFormatVersion = CommitLog.VERSION_1) + } + assert(e1.getMessage.contains("stateUniqueIds cannot be set")) - // TODO [SPARK-50653]: remove the below when v2 -> v1 backward compatibility is added - val e = intercept[IllegalStateException] { - new CommitLog(spark, testCommitLogV1FilePath.toString).deserialize(inputStream) + // Directly through withStateUniqueIds on a V1 metadata. + val e2 = intercept[IllegalArgumentException] { + CommitMetadata(1).withStateUniqueIds(Some(stateUniqueIds)) } + assert(e2.getMessage.contains("stateUniqueIds cannot be set")) - assert (e.getMessage.contains("only supported log version")) + // None and an empty map are allowed for V1 (no unique ids to persist). + assert(CommitMetadata(1).withStateUniqueIds(None).stateUniqueIds.isEmpty) + assert(commitLog.createMetadata( + nextBatchWatermarkMs = 1, + stateUniqueIds = Some(Map.empty[Long, Array[Array[String]]]), + commitLogFormatVersion = CommitLog.VERSION_1).version === CommitLog.VERSION_1) } } } From ea5478d483ab2a7315b9cd0f87d764125e95fb82 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Mon, 15 Jun 2026 20:57:58 -0700 Subject: [PATCH 2/3] [SPARK-56971][SS] Add CommitMetadataV3 and SinkMetadataInfo for sink evolution Add the commit log data structures for streaming sink evolution: - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a `sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`). - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` (serialized via `OffsetV2.json()`), `providerName`, and an `isActive` flag used to distinguish the current sink from historical sinks that were used in earlier batches but are no longer in use. - `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with `isActive = true`, if any. - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when `commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`. - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class. The V3 metadata is dormant in this PR: no caller produces it yet. Wiring through `MicroBatchExecution` (so each batch persists its sink name + offset, and so restarts read the map back) is the SPARK-56972 follow-up. This PR is built on top of #56018 (SPARK-56970). It currently shows the SPARK-56970 commits in its diff; that will resolve once SPARK-56970 merges. SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink evolution. Without a place in the commit log to durably record the sink name and offset alongside the rest of a committed batch's metadata, sink names cannot be observed on restart and the evolution feature cannot be completed. This PR introduces that storage in a separate, narrowly scoped change. No. `CommitMetadataV3` is in the internal `org.apache.spark.sql.execution.streaming.checkpointing` package and is not produced by any code path yet. Added unit tests in `CommitLogSuite`: - V3 SerDe with a single active sink (round-trips through commit log). - V3 retains historical sinks alongside the active one and `activeSinkMetadataInfoOpt` resolves correctly. - `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast with `IllegalArgumentException`. Generated-by: Claude Code (claude-opus-4-7) Closes #56019 from ericm-db/sink-evolution-sink-metadata-info. Authored-by: ericm-db Signed-off-by: Anish Shrigondekar (cherry picked from commit 4d262620541385ddd7e70dfa431265be0173f285) Signed-off-by: Anish Shrigondekar --- .../streaming/checkpointing/CommitLog.scala | 86 +++++++++++++++++- .../spark/sql/streaming/CommitLogSuite.scala | 91 ++++++++++++++++++- 2 files changed, 175 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala index 820aecf70d0ec..b5271f664cd76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/checkpointing/CommitLog.scala @@ -26,6 +26,7 @@ import org.json4s.{Formats, NoTypeHints} import org.json4s.jackson.Serialization import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf @@ -76,12 +77,18 @@ class CommitLog( /** * Factory for creating a [[CommitMetadataBase]] for the requested wire format version. * Defaults to the version configured via [[SQLConf.STATE_STORE_CHECKPOINT_FORMAT_VERSION]]. + * + * For [[VERSION_3]], [[sinkMetadataMap]] must be non-empty and contain exactly one active + * sink; [[CommitMetadataV3]] enforces this invariant. */ def createMetadata( nextBatchWatermarkMs: Long = 0, stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None, + sinkMetadataMap: Map[String, SinkMetadataInfo] = Map.empty, commitLogFormatVersion: Int = defaultVersion): CommitMetadataBase = { commitLogFormatVersion match { + case VERSION_3 => + CommitMetadataV3(nextBatchWatermarkMs, stateUniqueIds, sinkMetadataMap) case VERSION_2 => CommitMetadataV2(nextBatchWatermarkMs, stateUniqueIds) case VERSION_1 => @@ -98,7 +105,8 @@ object CommitLog { private val EMPTY_JSON = "{}" val VERSION_1 = 1 val VERSION_2 = 2 - val MAX_VERSION: Int = VERSION_2 + val VERSION_3 = 3 + val MAX_VERSION: Int = VERSION_3 /** * Reads a single commit log entry and dispatches to the matching @@ -112,6 +120,7 @@ object CommitLog { val version = MetadataVersionUtil.validateVersion(lines.next().trim, MAX_VERSION) val metadataJson = if (lines.hasNext) lines.next() else EMPTY_JSON version match { + case VERSION_3 => CommitMetadataV3(metadataJson) case VERSION_2 => CommitMetadataV2(metadataJson) case VERSION_1 => CommitMetadata(metadataJson) case v => throw QueryExecutionErrors.logVersionGreaterThanSupported(v, MAX_VERSION) @@ -205,3 +214,78 @@ object CommitMetadataV2 { def apply(json: String): CommitMetadataV2 = Serialization.read[CommitMetadataV2](json) } + +/** + * Commit log metadata for [[CommitLog.VERSION_3]]. Extends V2 with a map of per-sink metadata + * keyed by sink name. This enables streaming sink evolution: each batch records the active sink + * along with any historical sinks that were used in earlier batches but are no longer active. + * + * @param nextBatchWatermarkMs The watermark of the next batch. + * @param stateUniqueIds Per-operator state store unique ids (see [[CommitMetadataV2]]). + * @param sinkMetadataMap Non-empty map keyed by sink name with exactly one active entry per + * commit; deactivated sinks are retained to detect reuse of a sink name. + */ +case class CommitMetadataV3( + nextBatchWatermarkMs: Long = 0, + stateUniqueIds: Option[Map[Long, Array[Array[String]]]] = None, + sinkMetadataMap: Map[String, SinkMetadataInfo]) extends CommitMetadataBase { + require(sinkMetadataMap.nonEmpty, + "VERSION_3 commit log requires a non-empty sinkMetadataMap") + require(sinkMetadataMap.values.count(_.isActive) == 1, + "VERSION_3 commit log requires exactly one active sink, but found " + + s"${sinkMetadataMap.values.count(_.isActive)} in sinkMetadataMap") + + override def version: Int = CommitLog.VERSION_3 + + override def withStateUniqueIds( + stateUniqueIds: Option[Map[Long, Array[Array[String]]]]): CommitMetadataV3 = + copy(stateUniqueIds = stateUniqueIds) + + /** Returns the currently active sink's metadata; exactly one always exists (see require). */ + def activeSinkMetadataInfo: SinkMetadataInfo = sinkMetadataMap.values.find(_.isActive).get +} + +object CommitMetadataV3 { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + def apply(json: String): CommitMetadataV3 = Serialization.read[CommitMetadataV3](json) +} + +/** + * Per-sink metadata recorded in a [[CommitMetadataV3]] entry. + * + * @param sinkName Sink name as supplied via `DataStreamWriter.name()`, or + * `MicroBatchExecution.DEFAULT_SINK_NAME` when sink evolution is disabled. + * @param commitOffset The latest offset committed to the sink as a JSON string + * (i.e. [[OffsetV2.json()]]), or [[OffsetSeqLog.SERIALIZED_VOID_OFFSET]] if + * no offset is available. + * @param providerName Identifies the sink implementation (e.g. fully-qualified class name). + * @param apiVersion The API version for the sink - whether it is DSv1 or DSv2. + * @param isActive Whether this sink is the active sink for the current batch. Historical sinks + * are retained with `isActive = false`. + */ +case class SinkMetadataInfo( + sinkName: String, + commitOffset: String, + providerName: String, + apiVersion: String, + isActive: Boolean = true) { + def json: String = Serialization.write(this)(SinkMetadataInfo.format) +} + +object SinkMetadataInfo { + private implicit val format: Formats = Serialization.formats(NoTypeHints) + + def apply( + sinkName: String, + commitOffset: Option[OffsetV2], + providerName: String, + apiVersion: String, + isActive: Boolean): SinkMetadataInfo = { + val offsetString = commitOffset match { + case Some(off) => off.json + case None => OffsetSeqLog.SERIALIZED_VOID_OFFSET + } + new SinkMetadataInfo(sinkName, offsetString, providerName, apiVersion, isActive) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala index e5c81924e16bd..5dcfb279fe7aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/CommitLogSuite.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, FileInputStream, FileOutputStream} import java.nio.file.Path import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadata, CommitMetadataBase, CommitMetadataV2, CommitMetadataV3, OffsetSeqLog, SinkMetadataInfo} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -115,6 +115,95 @@ class CommitLogSuite extends SparkFunSuite with SharedSparkSession { } } + test("Basic Commit Log V3 SerDe - single active sink") { + withTempDir { tempDir => + val commitLog = new CommitLog(spark, tempDir.getAbsolutePath) + val sinkInfo = SinkMetadataInfo( + sinkName = "sink-0", + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = "memory", + apiVersion = "v2", + isActive = true) + val metadata = commitLog.createMetadata( + nextBatchWatermarkMs = 42, + sinkMetadataMap = Map("sink-0" -> sinkInfo), + commitLogFormatVersion = CommitLog.VERSION_3) + assert(commitLog.add(0, metadata)) + + val read = commitLog.get(0).get + assert(read.version === CommitLog.VERSION_3) + assert(read.nextBatchWatermarkMs === 42) + val readV3 = read.asInstanceOf[CommitMetadataV3] + assert(readV3.sinkMetadataMap === Map("sink-0" -> sinkInfo)) + assert(readV3.activeSinkMetadataInfo === sinkInfo) + } + } + + test("Commit Log V3 - retains historical sinks alongside active") { + withTempDir { tempDir => + val commitLog = new CommitLog(spark, tempDir.getAbsolutePath) + val historical = SinkMetadataInfo( + sinkName = "sink-0", + commitOffset = """{"offset":3}""", + providerName = "memory", + apiVersion = "v2", + isActive = false) + val active = SinkMetadataInfo( + sinkName = "sink-1", + commitOffset = """{"offset":7}""", + providerName = "memory", + apiVersion = "v2", + isActive = true) + val metadata = commitLog.createMetadata( + nextBatchWatermarkMs = 100, + sinkMetadataMap = Map("sink-0" -> historical, "sink-1" -> active), + commitLogFormatVersion = CommitLog.VERSION_3) + assert(commitLog.add(0, metadata)) + + val readV3 = commitLog.get(0).get.asInstanceOf[CommitMetadataV3] + assert(readV3.activeSinkMetadataInfo === active) + assert(readV3.sinkMetadataMap("sink-0") === historical) + assert(readV3.sinkMetadataMap("sink-1") === active) + } + } + + test("createMetadata for V3 requires non-empty sinkMetadataMap") { + withTempDir { tempDir => + val commitLog = new CommitLog(spark, tempDir.getAbsolutePath) + intercept[IllegalArgumentException] { + commitLog.createMetadata( + nextBatchWatermarkMs = 0, + sinkMetadataMap = Map.empty, + commitLogFormatVersion = CommitLog.VERSION_3) + } + } + } + + test("CommitMetadataV3 requires exactly one active sink") { + val historical = SinkMetadataInfo( + sinkName = "sink-0", + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = "memory", + apiVersion = "v2", + isActive = false) + val active = SinkMetadataInfo( + sinkName = "sink-1", + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = "memory", + apiVersion = "v2", + isActive = true) + + // No active sink. + intercept[IllegalArgumentException] { + CommitMetadataV3(sinkMetadataMap = Map("sink-0" -> historical)) + } + // More than one active sink. + intercept[IllegalArgumentException] { + CommitMetadataV3(sinkMetadataMap = + Map("sink-0" -> active.copy(sinkName = "sink-0"), "sink-1" -> active)) + } + } + // SPARK-50653: When the configured commit log version is V2, a V1 file on disk should still // deserialize successfully into a V1 [[CommitMetadata]] because the wire format version is now // discovered from the file header rather than enforced to match the conf. From 4cf4d79202f64147e2fbbda3d4fac87ec64a07f8 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 20 May 2026 20:12:45 +0000 Subject: [PATCH 3/3] [SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches: - Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`. - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`. - When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged. This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred. SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable. Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off. Added four new tests in `StreamingSinkEvolutionSuite`: - V3 commit log records the active sink for a named query. - Renaming the sink across a restart retains the previous sink as `isActive = false` and marks the new one active. - With sink evolution disabled, the commit log remains V1/V2. - Enabling sink evolution on a checkpoint that previously used V1/V2 transparently upgrades to V3 on the next commit. Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`, `MicroBatchExecutionSuite`, and `AsyncProgressTrackingMicroBatchExecutionSuite` all pass. Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Isaac --- .../runtime/MicroBatchExecution.scala | 46 ++++++- .../test/StreamingSinkEvolutionSuite.scala | 113 ++++++++++++++++++ 2 files changed, 156 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 94143799a8c41..2f50c205cb46d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitLog, CommitMetadataV3, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2, SinkMetadataInfo} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -129,6 +129,15 @@ class MicroBatchExecution( } } + // Historical sink metadata read from the commit log on restart. Insertion order is preserved so + // that we can re-emit deactivated sinks in the same order they originally appeared. Mutated by + // [[populateStartOffsets]] (reads) and by the commit-log write in [[runBatch]] (updates). + private val sinkMetadataMap = mutable.LinkedHashMap.empty[String, SinkMetadataInfo] + + /** True when the current query should persist V3 sink metadata in the commit log. */ + private def commitLogV3Enabled: Boolean = + sparkSession.sessionState.conf.enableStreamingSinkEvolution + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ protected def getTrigger(): TriggerExecutor = { @@ -765,6 +774,11 @@ class MicroBatchExecution( commitMetadata.stateUniqueIds.foreach { stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds } + commitMetadata match { + case v3: CommitMetadataV3 => + sinkMetadataMap ++= v3.sinkMetadataMap + case _ => + } if (latestBatchId == latestCommittedBatchId) { /* The last batch was successfully committed, so we can safely process a * new next batch but first: @@ -1463,10 +1477,36 @@ class MicroBatchExecution( } else { None } - if (!commitLog.add(execCtx.batchId, + val metadata = if (commitLogV3Enabled) { + val sinkApiVersion = sink match { + case _: SupportsWrite => "DSv2" + case _ => "DSv1" + } + val currentSinkInfo = SinkMetadataInfo( + sinkName = sinkName, + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = sink.getClass.getName, + apiVersion = sinkApiVersion, + isActive = true) + // Mark every previously-seen sink as inactive, then overlay the current sink as active. + // The previous entry for [[sinkName]], if any, is overwritten here. + val deactivated = sinkMetadataMap.iterator + .map { case (name, info) => name -> info.copy(isActive = false) } + .toMap + val updatedSinkMap = deactivated + (sinkName -> currentSinkInfo) + sinkMetadataMap.clear() + sinkMetadataMap ++= updatedSinkMap commitLog.createMetadata( nextBatchWatermarkMs = watermarkTracker.currentWatermark, - stateUniqueIds = stateStoreCkptId))) { + stateUniqueIds = stateStoreCkptId, + sinkMetadataMap = updatedSinkMap, + commitLogFormatVersion = CommitLog.VERSION_3) + } else { + commitLog.createMetadata( + nextBatchWatermarkMs = watermarkTracker.currentWatermark, + stateUniqueIds = stateStoreCkptId) + } + if (!commitLog.add(execCtx.batchId, metadata)) { throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala index a242faabaf921..ba25d19a91100 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Tag} import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadataV3} import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamTest @@ -183,6 +184,118 @@ class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach { q2.stop() } + // =========================== + // Commit log V3 persistence + // =========================== + + testWithSinkEvolution("commit log records V3 metadata with named sink") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val latest = commitLog.getLatest().getOrElse(fail("No commit recorded")) + val v3 = latest._2 match { + case v: CommitMetadataV3 => v + case other => fail(s"Expected CommitMetadataV3, got $other") + } + val active = v3.activeSinkMetadataInfo + assert(active.sinkName === "my_sink") + assert(active.isActive) + assert(v3.sinkMetadataMap.size === 1) + } + + testWithSinkEvolution("commit log V3 retains historical sink after rename") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // First batch under sink name "old_sink". + input.addData(1) + val q1 = input.toDF().writeStream + .format("noop") + .name("old_sink") + .option("checkpointLocation", checkpointDir) + .start() + q1.processAllAvailable() + q1.stop() + + // Restart with a new sink name "new_sink" against the same checkpoint. + input.addData(2) + val q2 = input.toDF().writeStream + .format("noop") + .name("new_sink") + .option("checkpointLocation", checkpointDir) + .start() + q2.processAllAvailable() + q2.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3] + assert(v3.sinkMetadataMap.keySet === Set("old_sink", "new_sink")) + assert(v3.activeSinkMetadataInfo.sinkName === "new_sink") + assert(v3.sinkMetadataMap("old_sink").isActive === false) + assert(v3.sinkMetadataMap("new_sink").isActive === true) + } + + test("commit log stays V1/V2 when sink evolution is disabled") { + val checkpointDir = newMetadataDir + withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + val input = MemoryStream[Int] + input.addData(1, 2) + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val latest = commitLog.getLatest().get._2 + assert(latest.version === CommitLog.VERSION_1 || latest.version === CommitLog.VERSION_2, + s"Expected V1 or V2 commit log, got v${latest.version}") + assert(!latest.isInstanceOf[CommitMetadataV3]) + } + + testWithSinkEvolution("enabling sink evolution mid-checkpoint upgrades commit log to V3") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // First run with sink evolution disabled writes V1/V2, no sink metadata. + withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + input.addData(1) + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + + // Restart with sink evolution enabled, supplying a name. V3 should now be written; the + // previous V1/V2 batches contribute no historical sinks. + input.addData(2) + val q = input.toDF().writeStream + .format("noop") + .name("upgraded_sink") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3] + assert(v3.activeSinkMetadataInfo.sinkName === "upgraded_sink") + assert(v3.sinkMetadataMap.size === 1) + } + // ============== // Helper Methods // ==============