From 1db0d5379bccf7207f3b56531982cf74df6c3dd5 Mon Sep 17 00:00:00 2001 From: ericm-db Date: Wed, 20 May 2026 20:12:45 +0000 Subject: [PATCH] [SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches: - Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`. - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`. - When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged. This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred. SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable. Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off. Added four new tests in `StreamingSinkEvolutionSuite`: - V3 commit log records the active sink for a named query. - Renaming the sink across a restart retains the previous sink as `isActive = false` and marks the new one active. - With sink evolution disabled, the commit log remains V1/V2. - Enabling sink evolution on a checkpoint that previously used V1/V2 transparently upgrades to V3 on the next commit. Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`, `MicroBatchExecutionSuite`, and `AsyncProgressTrackingMicroBatchExecutionSuite` all pass. Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Isaac --- .../runtime/MicroBatchExecution.scala | 46 ++++++- .../test/StreamingSinkEvolutionSuite.scala | 113 ++++++++++++++++++ 2 files changed, 156 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala index 94143799a8c41..2f50c205cb46d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper} -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2} +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitLog, CommitMetadataV3, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2, SinkMetadataInfo} import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter} import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE} import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1} @@ -129,6 +129,15 @@ class MicroBatchExecution( } } + // Historical sink metadata read from the commit log on restart. Insertion order is preserved so + // that we can re-emit deactivated sinks in the same order they originally appeared. Mutated by + // [[populateStartOffsets]] (reads) and by the commit-log write in [[runBatch]] (updates). + private val sinkMetadataMap = mutable.LinkedHashMap.empty[String, SinkMetadataInfo] + + /** True when the current query should persist V3 sink metadata in the commit log. */ + private def commitLogV3Enabled: Boolean = + sparkSession.sessionState.conf.enableStreamingSinkEvolution + @volatile protected[sql] var triggerExecutor: TriggerExecutor = _ protected def getTrigger(): TriggerExecutor = { @@ -765,6 +774,11 @@ class MicroBatchExecution( commitMetadata.stateUniqueIds.foreach { stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds } + commitMetadata match { + case v3: CommitMetadataV3 => + sinkMetadataMap ++= v3.sinkMetadataMap + case _ => + } if (latestBatchId == latestCommittedBatchId) { /* The last batch was successfully committed, so we can safely process a * new next batch but first: @@ -1463,10 +1477,36 @@ class MicroBatchExecution( } else { None } - if (!commitLog.add(execCtx.batchId, + val metadata = if (commitLogV3Enabled) { + val sinkApiVersion = sink match { + case _: SupportsWrite => "DSv2" + case _ => "DSv1" + } + val currentSinkInfo = SinkMetadataInfo( + sinkName = sinkName, + commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET, + providerName = sink.getClass.getName, + apiVersion = sinkApiVersion, + isActive = true) + // Mark every previously-seen sink as inactive, then overlay the current sink as active. + // The previous entry for [[sinkName]], if any, is overwritten here. + val deactivated = sinkMetadataMap.iterator + .map { case (name, info) => name -> info.copy(isActive = false) } + .toMap + val updatedSinkMap = deactivated + (sinkName -> currentSinkInfo) + sinkMetadataMap.clear() + sinkMetadataMap ++= updatedSinkMap commitLog.createMetadata( nextBatchWatermarkMs = watermarkTracker.currentWatermark, - stateUniqueIds = stateStoreCkptId))) { + stateUniqueIds = stateStoreCkptId, + sinkMetadataMap = updatedSinkMap, + commitLogFormatVersion = CommitLog.VERSION_3) + } else { + commitLog.createMetadata( + nextBatchWatermarkMs = watermarkTracker.currentWatermark, + stateUniqueIds = stateStoreCkptId) + } + if (!commitLog.add(execCtx.batchId, metadata)) { throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala index a242faabaf921..ba25d19a91100 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/StreamingSinkEvolutionSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Tag} import org.apache.spark.SparkException import org.apache.spark.sql._ +import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadataV3} import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamTest @@ -183,6 +184,118 @@ class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach { q2.stop() } + // =========================== + // Commit log V3 persistence + // =========================== + + testWithSinkEvolution("commit log records V3 metadata with named sink") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + input.addData(1, 2, 3) + val q = input.toDF().writeStream + .format("noop") + .name("my_sink") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val latest = commitLog.getLatest().getOrElse(fail("No commit recorded")) + val v3 = latest._2 match { + case v: CommitMetadataV3 => v + case other => fail(s"Expected CommitMetadataV3, got $other") + } + val active = v3.activeSinkMetadataInfo + assert(active.sinkName === "my_sink") + assert(active.isActive) + assert(v3.sinkMetadataMap.size === 1) + } + + testWithSinkEvolution("commit log V3 retains historical sink after rename") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // First batch under sink name "old_sink". + input.addData(1) + val q1 = input.toDF().writeStream + .format("noop") + .name("old_sink") + .option("checkpointLocation", checkpointDir) + .start() + q1.processAllAvailable() + q1.stop() + + // Restart with a new sink name "new_sink" against the same checkpoint. + input.addData(2) + val q2 = input.toDF().writeStream + .format("noop") + .name("new_sink") + .option("checkpointLocation", checkpointDir) + .start() + q2.processAllAvailable() + q2.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3] + assert(v3.sinkMetadataMap.keySet === Set("old_sink", "new_sink")) + assert(v3.activeSinkMetadataInfo.sinkName === "new_sink") + assert(v3.sinkMetadataMap("old_sink").isActive === false) + assert(v3.sinkMetadataMap("new_sink").isActive === true) + } + + test("commit log stays V1/V2 when sink evolution is disabled") { + val checkpointDir = newMetadataDir + withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + val input = MemoryStream[Int] + input.addData(1, 2) + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val latest = commitLog.getLatest().get._2 + assert(latest.version === CommitLog.VERSION_1 || latest.version === CommitLog.VERSION_2, + s"Expected V1 or V2 commit log, got v${latest.version}") + assert(!latest.isInstanceOf[CommitMetadataV3]) + } + + testWithSinkEvolution("enabling sink evolution mid-checkpoint upgrades commit log to V3") { + val checkpointDir = newMetadataDir + val input = MemoryStream[Int] + + // First run with sink evolution disabled writes V1/V2, no sink metadata. + withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") { + input.addData(1) + val q = input.toDF().writeStream + .format("noop") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + } + + // Restart with sink evolution enabled, supplying a name. V3 should now be written; the + // previous V1/V2 batches contribute no historical sinks. + input.addData(2) + val q = input.toDF().writeStream + .format("noop") + .name("upgraded_sink") + .option("checkpointLocation", checkpointDir) + .start() + q.processAllAvailable() + q.stop() + + val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true) + val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3] + assert(v3.activeSinkMetadataInfo.sinkName === "upgraded_sink") + assert(v3.sinkMetadataMap.size === 1) + } + // ============== // Helper Methods // ==============