Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, RealTimeStreamScanExec, StreamingDataSourceV2Relation, StreamingDataSourceV2ScanRelation, StreamWriterCommitProgress, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.{AvailableNowTrigger, Offset, OneTimeTrigger, ProcessingTimeTrigger, RealTimeModeAllowlist, RealTimeTrigger, Sink, Source, StreamingQueryPlanTraverseHelper}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2}
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, CommitLog, CommitMetadataV3, OffsetSeqBase, OffsetSeqLog, OffsetSeqMetadata, OffsetSeqMetadataV2, SinkMetadataInfo}
import org.apache.spark.sql.execution.streaming.operators.stateful.{StatefulOperatorStateInfo, StatefulOpStateStoreCheckpointInfo, StateStoreWriter}
import org.apache.spark.sql.execution.streaming.runtime.StreamingCheckpointConstants.{DIR_NAME_COMMITS, DIR_NAME_OFFSETS, DIR_NAME_STATE}
import org.apache.spark.sql.execution.streaming.sources.{ForeachBatchSink, WriteToMicroBatchDataSource, WriteToMicroBatchDataSourceV1}
Expand Down Expand Up @@ -129,6 +129,15 @@ class MicroBatchExecution(
}
}

// Historical sink metadata read from the commit log on restart. Insertion order is preserved so
// that we can re-emit deactivated sinks in the same order they originally appeared. Mutated by
// [[populateStartOffsets]] (reads) and by the commit-log write in [[runBatch]] (updates).
private val sinkMetadataMap = mutable.LinkedHashMap.empty[String, SinkMetadataInfo]

/** True when the current query should persist V3 sink metadata in the commit log. */
private def commitLogV3Enabled: Boolean =
sparkSession.sessionState.conf.enableStreamingSinkEvolution

@volatile protected[sql] var triggerExecutor: TriggerExecutor = _

protected def getTrigger(): TriggerExecutor = {
Expand Down Expand Up @@ -765,6 +774,11 @@ class MicroBatchExecution(
commitMetadata.stateUniqueIds.foreach {
stateUniqueIds => currentStateStoreCkptId ++= stateUniqueIds
}
commitMetadata match {
case v3: CommitMetadataV3 =>
sinkMetadataMap ++= v3.sinkMetadataMap
case _ =>
}
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
Expand Down Expand Up @@ -1463,10 +1477,36 @@ class MicroBatchExecution(
} else {
None
}
if (!commitLog.add(execCtx.batchId,
val metadata = if (commitLogV3Enabled) {
val sinkApiVersion = sink match {
case _: SupportsWrite => "DSv2"
case _ => "DSv1"
}
val currentSinkInfo = SinkMetadataInfo(
sinkName = sinkName,
commitOffset = OffsetSeqLog.SERIALIZED_VOID_OFFSET,
providerName = sink.getClass.getName,
apiVersion = sinkApiVersion,
isActive = true)
// Mark every previously-seen sink as inactive, then overlay the current sink as active.
// The previous entry for [[sinkName]], if any, is overwritten here.
val deactivated = sinkMetadataMap.iterator
.map { case (name, info) => name -> info.copy(isActive = false) }
.toMap
val updatedSinkMap = deactivated + (sinkName -> currentSinkInfo)
sinkMetadataMap.clear()
sinkMetadataMap ++= updatedSinkMap
commitLog.createMetadata(
nextBatchWatermarkMs = watermarkTracker.currentWatermark,
stateUniqueIds = stateStoreCkptId))) {
stateUniqueIds = stateStoreCkptId,
sinkMetadataMap = updatedSinkMap,
commitLogFormatVersion = CommitLog.VERSION_3)
} else {
commitLog.createMetadata(
nextBatchWatermarkMs = watermarkTracker.currentWatermark,
stateUniqueIds = stateStoreCkptId)
}
if (!commitLog.add(execCtx.batchId, metadata)) {
throw QueryExecutionErrors.concurrentStreamLogUpdate(execCtx.batchId)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.scalatest.{BeforeAndAfterEach, Tag}

import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.checkpointing.{CommitLog, CommitMetadataV3}
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamTest
Expand Down Expand Up @@ -183,6 +184,118 @@ class StreamingSinkEvolutionSuite extends StreamTest with BeforeAndAfterEach {
q2.stop()
}

// ===========================
// Commit log V3 persistence
// ===========================

testWithSinkEvolution("commit log records V3 metadata with named sink") {
val checkpointDir = newMetadataDir
val input = MemoryStream[Int]
input.addData(1, 2, 3)
val q = input.toDF().writeStream
.format("noop")
.name("my_sink")
.option("checkpointLocation", checkpointDir)
.start()
q.processAllAvailable()
q.stop()

val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
val latest = commitLog.getLatest().getOrElse(fail("No commit recorded"))
val v3 = latest._2 match {
case v: CommitMetadataV3 => v
case other => fail(s"Expected CommitMetadataV3, got $other")
}
val active = v3.activeSinkMetadataInfo
assert(active.sinkName === "my_sink")
assert(active.isActive)
assert(v3.sinkMetadataMap.size === 1)
}

testWithSinkEvolution("commit log V3 retains historical sink after rename") {
val checkpointDir = newMetadataDir
val input = MemoryStream[Int]

// First batch under sink name "old_sink".
input.addData(1)
val q1 = input.toDF().writeStream
.format("noop")
.name("old_sink")
.option("checkpointLocation", checkpointDir)
.start()
q1.processAllAvailable()
q1.stop()

// Restart with a new sink name "new_sink" against the same checkpoint.
input.addData(2)
val q2 = input.toDF().writeStream
.format("noop")
.name("new_sink")
.option("checkpointLocation", checkpointDir)
.start()
q2.processAllAvailable()
q2.stop()

val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3]
assert(v3.sinkMetadataMap.keySet === Set("old_sink", "new_sink"))
assert(v3.activeSinkMetadataInfo.sinkName === "new_sink")
assert(v3.sinkMetadataMap("old_sink").isActive === false)
assert(v3.sinkMetadataMap("new_sink").isActive === true)
}

test("commit log stays V1/V2 when sink evolution is disabled") {
val checkpointDir = newMetadataDir
withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") {
val input = MemoryStream[Int]
input.addData(1, 2)
val q = input.toDF().writeStream
.format("noop")
.option("checkpointLocation", checkpointDir)
.start()
q.processAllAvailable()
q.stop()
}

val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
val latest = commitLog.getLatest().get._2
assert(latest.version === CommitLog.VERSION_1 || latest.version === CommitLog.VERSION_2,
s"Expected V1 or V2 commit log, got v${latest.version}")
assert(!latest.isInstanceOf[CommitMetadataV3])
}

testWithSinkEvolution("enabling sink evolution mid-checkpoint upgrades commit log to V3") {
val checkpointDir = newMetadataDir
val input = MemoryStream[Int]

// First run with sink evolution disabled writes V1/V2, no sink metadata.
withSQLConf(SQLConf.ENABLE_STREAMING_SINK_EVOLUTION.key -> "false") {
input.addData(1)
val q = input.toDF().writeStream
.format("noop")
.option("checkpointLocation", checkpointDir)
.start()
q.processAllAvailable()
q.stop()
}

// Restart with sink evolution enabled, supplying a name. V3 should now be written; the
// previous V1/V2 batches contribute no historical sinks.
input.addData(2)
val q = input.toDF().writeStream
.format("noop")
.name("upgraded_sink")
.option("checkpointLocation", checkpointDir)
.start()
q.processAllAvailable()
q.stop()

val commitLog = new CommitLog(spark, s"$checkpointDir/commits", readOnly = true)
val v3 = commitLog.getLatest().get._2.asInstanceOf[CommitMetadataV3]
assert(v3.activeSinkMetadataInfo.sinkName === "upgraded_sink")
assert(v3.sinkMetadataMap.size === 1)
}

// ==============
// Helper Methods
// ==============
Expand Down