[SPARK-56972][SS][4.x] Persist sink name in V3 commit log via MicroBatchExecution#56706
Closed
ericm-db wants to merge 1 commit into
Closed
[SPARK-56972][SS][4.x] Persist sink name in V3 commit log via MicroBatchExecution#56706ericm-db wants to merge 1 commit into
ericm-db wants to merge 1 commit into
Conversation
…ecution Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches: - Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`. - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`. - When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged. This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred. SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable. Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off. Added four new tests in `StreamingSinkEvolutionSuite`: - V3 commit log records the active sink for a named query. - Renaming the sink across a restart retains the previous sink as `isActive = false` and marks the new one active. - With sink evolution disabled, the commit log remains V1/V2. - Enabling sink evolution on a checkpoint that previously used V1/V2 transparently upgrades to V3 on the next commit. Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`, `MicroBatchExecutionSuite`, and `AsyncProgressTrackingMicroBatchExecutionSuite` all pass. Generated-by: Claude Code (claude-opus-4-7) Co-authored-by: Isaac
uros-b
approved these changes
Jun 24, 2026
uros-b
left a comment
Member
There was a problem hiding this comment.
This looks like a clean 4.x backport, thanks @ericm-db! cc @anishshri-db
Contributor
|
thanks, merging to 4.x |
cloud-fan
pushed a commit
that referenced
this pull request
Jun 24, 2026
…tchExecution ### What changes were proposed in this pull request? Backport of [SPARK-56972] ([#56020](#56020)) to `branch-4.x`. Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches: - Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`. - When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`. - When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged. This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred. **Prerequisites.** The predecessors SPARK-56970 ([#56018](#56018)), SPARK-56971 ([#56019](#56019)), and SPARK-56719 (the `DataStreamWriter.name()` API) are already present in `branch-4.x`, so this is a standalone cherry-pick of `cfa759af5b6`. The only conflict was an import-line collision in `MicroBatchExecution.scala` (`branch-4.x` does not carry the master-only `CheckpointVersionManager`/`OffsetLogType` symbols in that import); the resolution keeps the branch's existing import and adds only `CommitLog`, `CommitMetadataV3`, and `SinkMetadataInfo`. The resulting diff is identical to the master commit (+156/-3). ### Why are the changes needed? SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable. ### Does this PR introduce _any_ user-facing change? Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off. ### How was this patch tested? - Cherry-picked `cfa759af5b6` from master; resolved the single import-line conflict in `MicroBatchExecution.scala`. - `StreamingSinkEvolutionSuite` passes on `branch-4.x` (12 tests, including the four new V3 commit-log cases: named-sink active entry, historical-sink retention across rename, V1/V2 preserved when disabled, and mid-checkpoint upgrade to V3). - `sql/core` main and test sources compile cleanly on `branch-4.x` (`build/sbt sql/Test/compile`). ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code (claude-opus-4-8) This pull request and its description were written by Isaac. Closes #56706 from ericm-db/SPARK-56972-branch-4.x. Authored-by: Eric Marnadi <eric.marnadi@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Backport of [SPARK-56972] (apache/spark#56020) to
branch-4.x.Wire the sink name through
MicroBatchExecutionso that, when sink evolution is enabled, each committed batch writes aCommitMetadataV3whosesinkMetadataMaprecords the current sink as the active entry alongside any sinks that were active in earlier batches:sinkMetadataMapthat is hydrated from the latestCommitMetadataV3inpopulateStartOffsets.spark.sql.streaming.queryEvolution.enableSinkEvolutionis true, the commit-log write inrunBatchproducesCommitMetadataV3with every prior entry markedisActive = falseand the current(sinkName, sink.getClass.getName)entered asisActive = true.This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred.
Prerequisites. The predecessors SPARK-56970 (apache/spark#56018), SPARK-56971 (apache/spark#56019), and SPARK-56719 (the
DataStreamWriter.name()API) are already present inbranch-4.x, so this is a standalone cherry-pick ofcfa759af5b6. The only conflict was an import-line collision inMicroBatchExecution.scala(branch-4.xdoes not carry the master-onlyCheckpointVersionManager/OffsetLogTypesymbols in that import); the resolution keeps the branch's existing import and adds onlyCommitLog,CommitMetadataV3, andSinkMetadataInfo. The resulting diff is identical to the master commit (+156/-3).Why are the changes needed?
SPARK-56719 introduced the
DataStreamWriter.name()API and the in-memorysinkNameplumbing insideMicroBatchExecution, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable.Does this PR introduce any user-facing change?
Behavior change only when
enableSinkEvolutionis true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off.How was this patch tested?
cfa759af5b6from master; resolved the single import-line conflict inMicroBatchExecution.scala.StreamingSinkEvolutionSuitepasses onbranch-4.x(12 tests, including the four new V3 commit-log cases: named-sink active entry, historical-sink retention across rename, V1/V2 preserved when disabled, and mid-checkpoint upgrade to V3).sql/coremain and test sources compile cleanly onbranch-4.x(build/sbt sql/Test/compile).Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-8)
This pull request and its description were written by Isaac.