Skip to content

[SPARK-56972][SS][4.x] Persist sink name in V3 commit log via MicroBatchExecution#56706

Closed
ericm-db wants to merge 1 commit into
apache:branch-4.xfrom
ericm-db:SPARK-56972-branch-4.x
Closed

[SPARK-56972][SS][4.x] Persist sink name in V3 commit log via MicroBatchExecution#56706
ericm-db wants to merge 1 commit into
apache:branch-4.xfrom
ericm-db:SPARK-56972-branch-4.x

Conversation

@ericm-db

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Backport of [SPARK-56972] (apache/spark#56020) to branch-4.x.

Wire the sink name through MicroBatchExecution so that, when sink evolution is enabled, each committed batch writes a CommitMetadataV3 whose sinkMetadataMap records the current sink as the active entry alongside any sinks that were active in earlier batches:

  • Add a per-execution sinkMetadataMap that is hydrated from the latest CommitMetadataV3 in populateStartOffsets.
  • When spark.sql.streaming.queryEvolution.enableSinkEvolution is true, the commit-log write in runBatch produces CommitMetadataV3 with every prior entry marked isActive = false and the current (sinkName, sink.getClass.getName) entered as isActive = true.
  • When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged.

This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred.

Prerequisites. The predecessors SPARK-56970 (apache/spark#56018), SPARK-56971 (apache/spark#56019), and SPARK-56719 (the DataStreamWriter.name() API) are already present in branch-4.x, so this is a standalone cherry-pick of cfa759af5b6. The only conflict was an import-line collision in MicroBatchExecution.scala (branch-4.x does not carry the master-only CheckpointVersionManager/OffsetLogType symbols in that import); the resolution keeps the branch's existing import and adds only CommitLog, CommitMetadataV3, and SinkMetadataInfo. The resulting diff is identical to the master commit (+156/-3).

Why are the changes needed?

SPARK-56719 introduced the DataStreamWriter.name() API and the in-memory sinkName plumbing inside MicroBatchExecution, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable.

Does this PR introduce any user-facing change?

Behavior change only when enableSinkEvolution is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off.

How was this patch tested?

  • Cherry-picked cfa759af5b6 from master; resolved the single import-line conflict in MicroBatchExecution.scala.
  • StreamingSinkEvolutionSuite passes on branch-4.x (12 tests, including the four new V3 commit-log cases: named-sink active entry, historical-sink retention across rename, V1/V2 preserved when disabled, and mid-checkpoint upgrade to V3).
  • sql/core main and test sources compile cleanly on branch-4.x (build/sbt sql/Test/compile).

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-8)

This pull request and its description were written by Isaac.

…ecution

Wire the sink name through `MicroBatchExecution` so that, when sink
evolution is enabled, each committed batch writes a
`CommitMetadataV3` whose `sinkMetadataMap` records the current sink as
the active entry alongside any sinks that were active in earlier
batches:

- Add a per-execution `sinkMetadataMap` that is hydrated from the latest
  `CommitMetadataV3` in `populateStartOffsets`.
- When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true,
  the commit-log write in `runBatch` produces `CommitMetadataV3` with
  every prior entry marked `isActive = false` and the current
  `(sinkName, sink.getClass.getName)` entered as `isActive = true`.
- When sink evolution is disabled, the existing V1/V2 commit-log path
  is preserved unchanged.

This is the minimal write-then-read parity for the sink evolution
feature added in SPARK-56719. Provider-mismatch and sink-reuse
validation are intentionally deferred.

SPARK-56719 introduced the `DataStreamWriter.name()` API and the
in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the
sink name was not yet persisted to the checkpoint. Without
persistence, restarts cannot observe historical sink identity and the
feature is not durable.

Behavior change only when `enableSinkEvolution` is true (off by
default): the commit log directory now contains V3 commit log files
instead of V1/V2 files. Wire format compatibility is preserved when
the flag is left off.

Added four new tests in `StreamingSinkEvolutionSuite`:
- V3 commit log records the active sink for a named query.
- Renaming the sink across a restart retains the previous sink as
  `isActive = false` and marks the new one active.
- With sink evolution disabled, the commit log remains V1/V2.
- Enabling sink evolution on a checkpoint that previously used
  V1/V2 transparently upgrades to V3 on the next commit.

Existing `StreamingSinkEvolutionSuite`, `CommitLogSuite`,
`MicroBatchExecutionSuite`, and
`AsyncProgressTrackingMicroBatchExecutionSuite` all pass.

Generated-by: Claude Code (claude-opus-4-7)

Co-authored-by: Isaac
@dongjoon-hyun dongjoon-hyun changed the title [SPARK-56972][SS] Persist sink name in V3 commit log via MicroBatchExecution [SPARK-56972][SS][4.x] Persist sink name in V3 commit log via MicroBatchExecution Jun 23, 2026

@uros-b uros-b left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a clean 4.x backport, thanks @ericm-db! cc @anishshri-db

@cloud-fan

Copy link
Copy Markdown
Contributor

thanks, merging to 4.x

cloud-fan pushed a commit that referenced this pull request Jun 24, 2026
…tchExecution

### What changes were proposed in this pull request?

Backport of [SPARK-56972] ([#56020](#56020)) to `branch-4.x`.

Wire the sink name through `MicroBatchExecution` so that, when sink evolution is enabled, each committed batch writes a `CommitMetadataV3` whose `sinkMetadataMap` records the current sink as the active entry alongside any sinks that were active in earlier batches:

- Add a per-execution `sinkMetadataMap` that is hydrated from the latest `CommitMetadataV3` in `populateStartOffsets`.
- When `spark.sql.streaming.queryEvolution.enableSinkEvolution` is true, the commit-log write in `runBatch` produces `CommitMetadataV3` with every prior entry marked `isActive = false` and the current `(sinkName, sink.getClass.getName)` entered as `isActive = true`.
- When sink evolution is disabled, the existing V1/V2 commit-log path is preserved unchanged.

This is the minimal write-then-read parity for the sink evolution feature added in SPARK-56719. Provider-mismatch and sink-reuse validation are intentionally deferred.

**Prerequisites.** The predecessors SPARK-56970 ([#56018](#56018)), SPARK-56971 ([#56019](#56019)), and SPARK-56719 (the `DataStreamWriter.name()` API) are already present in `branch-4.x`, so this is a standalone cherry-pick of `cfa759af5b6`. The only conflict was an import-line collision in `MicroBatchExecution.scala` (`branch-4.x` does not carry the master-only `CheckpointVersionManager`/`OffsetLogType` symbols in that import); the resolution keeps the branch's existing import and adds only `CommitLog`, `CommitMetadataV3`, and `SinkMetadataInfo`. The resulting diff is identical to the master commit (+156/-3).

### Why are the changes needed?

SPARK-56719 introduced the `DataStreamWriter.name()` API and the in-memory `sinkName` plumbing inside `MicroBatchExecution`, but the sink name was not yet persisted to the checkpoint. Without persistence, restarts cannot observe historical sink identity and the feature is not durable.

### Does this PR introduce _any_ user-facing change?

Behavior change only when `enableSinkEvolution` is true (off by default): the commit log directory now contains V3 commit log files instead of V1/V2 files. Wire format compatibility is preserved when the flag is left off.

### How was this patch tested?

- Cherry-picked `cfa759af5b6` from master; resolved the single import-line conflict in `MicroBatchExecution.scala`.
- `StreamingSinkEvolutionSuite` passes on `branch-4.x` (12 tests, including the four new V3 commit-log cases: named-sink active entry, historical-sink retention across rename, V1/V2 preserved when disabled, and mid-checkpoint upgrade to V3).
- `sql/core` main and test sources compile cleanly on `branch-4.x` (`build/sbt sql/Test/compile`).

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-8)

This pull request and its description were written by Isaac.

Closes #56706 from ericm-db/SPARK-56972-branch-4.x.

Authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan cloud-fan closed this Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants