Skip to content

[SPARK-57651][CONNECT][SS][TESTS] Deflake PythonPipelineSuite and add FileStreamSink batch-commit race diagnostics#56722

Closed
HyukjinKwon wants to merge 3 commits into
apache:masterfrom
HyukjinKwon:SPARK-57651
Closed

[SPARK-57651][CONNECT][SS][TESTS] Deflake PythonPipelineSuite and add FileStreamSink batch-commit race diagnostics#56722
HyukjinKwon wants to merge 3 commits into
apache:masterfrom
HyukjinKwon:SPARK-57651

Conversation

@HyukjinKwon

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Two changes:

  1. Deflake PythonPipelineSuite."flow progress events have correct python source code location".
    The test defined two flows writing to the same streaming table table1: the implicit flow
    created by @dp.table def table1() and the @dp.append_flow(target='table1') def standalone_flow1()
    append flow. Because the two flows have no data dependency on each other, the pipeline scheduler
    runs them concurrently, and since both write to the same file-based streaming table they share a
    single FileStreamSink _spark_metadata commit log. They then race on the batch-0 commit in
    ManifestFileCommitProtocol.commitJob, which throws IllegalStateException("Race while writing batch 0").
    This made the test intermittently fail (e.g. it passes on most master runs but failed in the run linked below).

    The fix points standalone_flow1 at its own dedicated streaming table created with
    dp.create_streaming_table('st2'), so each flow writes to a distinct sink and there is no shared
    _spark_metadata. The test's intent (verifying that an append flow's Python source-code location is
    propagated) is unchanged; the asserted line numbers are preserved (the target is edited in place and
    the create_streaming_table call is added below the asserted def lines so nothing shifts).

    Note: serializing the two flows would not be a correct alternative — FileStreamSink.addBatch
    skips a batch when batchId <= latestBatchId, so a serialized second writer would silently drop its
    batch rather than error. Separate sinks is the correct fix.

  2. Add diagnostics to the ManifestFileCommitProtocol batch-commit race. Previously commitJob
    threw a bare Race while writing batch N. It now logs the sink _spark_metadata path and the
    batchId at ERROR, and the exception message names the likely cause (multiple concurrent streaming
    queries writing to the same output path). This way, if the same class of race ever resurfaces in a
    scheduled job or a real pipeline, it is diagnosable from the logs alone without having to reproduce it.

Why are the changes needed?

PythonPipelineSuite is a flaky test on CI (the race surfaces intermittently in the
streaming, sql-kafka-0-10, ..., connect, avro module group). The goal is a green, non-flaky build.

Does this PR introduce any user-facing change?

No. The only production change is a clearer error message / additional ERROR log when the
(already-existing) batch-commit race is hit.

How was this patch tested?

A trimmed workflow on a fork builds sql/connect/server and runs only PythonPipelineSuite,
repeated 10× (a single green run does not prove a flaky race is gone). All 10 runs were green
(106 tests, 0 failures each).

This pull request and its description were written by Isaac.

…ding concurrent file-sink writers

The test 'flow progress events have correct python source code location' defined
two flows writing to the same streaming table 'table1' (its implicit flow plus the
'standalone_flow1' append flow). They run concurrently and share a single
FileStreamSink '_spark_metadata' commit log, so they race on the batch-0 commit in
ManifestFileCommitProtocol ('Race while writing batch 0'), making the test flaky.

Point 'standalone_flow1' at its own dedicated streaming table 'st2' so each flow
writes a distinct sink. The append-flow source-code-location assertions are
unchanged (target edited in place; create_streaming_table added below them so line
numbers do not shift).
…-commit race

When two writers race on the same FileStreamSink _spark_metadata log, commitJob threw
a bare 'Race while writing batch N'. Log the sink metadata path + batchId at ERROR and
include them (plus the likely cause: concurrent streaming queries writing to the same
output path) in the exception, so a recurrence in scheduled jobs is diagnosable from
logs without re-reproducing the race.
throw new IllegalStateException(
s"Race while writing batch $batchId to the file sink metadata log at '$path'. Another " +
"writer already committed this batch, which usually means multiple concurrent streaming " +
"queries are writing to the same output path (they share one _spark_metadata log).")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be almost identical information. Can we share the same error message for both logError and IllegalStateException?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — done in df7d9a3. Built the message once via the structured logging API and reused its rendered text (errorMsg.message) for the IllegalStateException, so the log and the exception can't drift. Thanks for the review!

@dongjoon-hyun dongjoon-hyun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (with a nit comment)

@HyukjinKwon

Copy link
Copy Markdown
Member Author

Automated code review (ReviewFlow / isaac review)

Ran a multi-reviewer pass (logical + dead-code reviewers, 3 adaptive runs) over this PR's diff
(2 files, +21/-2):

Severity Critical Major Minor Info Total
Findings 0 0 0 0 0

No issues found in the changed files
(sql/connect/server/.../PythonPipelineSuite.scala,
sql/core/.../ManifestFileCommitProtocol.scala).

Notes from manual double-check:

  • The test change is data-only (retarget standalone_flow1 to its own st2 table + a
    create_streaming_table call added below the asserted lines), so the source-code-location
    assertions are unaffected — confirmed by 10× repeat runs of PythonPipelineSuite on CI
    (106/106 each): https://github.com/HyukjinKwon/spark/actions/runs/28074747108
  • The ManifestFileCommitProtocol change is diagnostics-only (richer log + exception message on an
    already-existing race); no behavioral change.

Verdict: no blocking issues. Posting as a comment, not an approval.

…legalStateException

Address review comment: build the race message once via the structured logging API
and reuse its rendered text (errorMsg.message) for the IllegalStateException, instead
of duplicating near-identical strings.

Co-authored-by: Isaac
HyukjinKwon added a commit that referenced this pull request Jun 24, 2026
…FileStreamSink batch-commit race diagnostics

### What changes were proposed in this pull request?

Two changes:

1. **Deflake `PythonPipelineSuite."flow progress events have correct python source code location"`.**
   The test defined **two** flows writing to the same streaming table `table1`: the implicit flow
   created by `dp.table def table1()` and the `dp.append_flow(target='table1') def standalone_flow1()`
   append flow. Because the two flows have no data dependency on each other, the pipeline scheduler
   runs them **concurrently**, and since both write to the same file-based streaming table they share a
   single `FileStreamSink` `_spark_metadata` commit log. They then race on the batch-0 commit in
   `ManifestFileCommitProtocol.commitJob`, which throws `IllegalStateException("Race while writing batch 0")`.
   This made the test intermittently fail (e.g. it passes on most master runs but failed in the run linked below).

   The fix points `standalone_flow1` at its own dedicated streaming table created with
   `dp.create_streaming_table('st2')`, so each flow writes to a distinct sink and there is no shared
   `_spark_metadata`. The test's intent (verifying that an append flow's Python source-code location is
   propagated) is unchanged; the asserted line numbers are preserved (the `target` is edited in place and
   the `create_streaming_table` call is added below the asserted `def` lines so nothing shifts).

   Note: serializing the two flows would **not** be a correct alternative — `FileStreamSink.addBatch`
   skips a batch when `batchId <= latestBatchId`, so a serialized second writer would silently drop its
   batch rather than error. Separate sinks is the correct fix.

2. **Add diagnostics to the `ManifestFileCommitProtocol` batch-commit race.** Previously `commitJob`
   threw a bare `Race while writing batch N`. It now logs the sink `_spark_metadata` path and the
   batchId at ERROR, and the exception message names the likely cause (multiple concurrent streaming
   queries writing to the same output path). This way, if the same class of race ever resurfaces in a
   scheduled job or a real pipeline, it is diagnosable from the logs alone without having to reproduce it.

### Why are the changes needed?

`PythonPipelineSuite` is a flaky test on CI (the race surfaces intermittently in the
`streaming, sql-kafka-0-10, ..., connect, avro` module group). The goal is a green, non-flaky build.

### Does this PR introduce _any_ user-facing change?

No. The only production change is a clearer error message / additional ERROR log when the
(already-existing) batch-commit race is hit.

### How was this patch tested?

A trimmed workflow on a fork builds `sql/connect/server` and runs **only** `PythonPipelineSuite`,
**repeated 10×** (a single green run does not prove a flaky race is gone). All 10 runs were green
(106 tests, 0 failures each).

- ❌ Before (race observed) — apache/spark master push run, module group incl. `connect`:
  https://github.com/apache/spark/actions/runs/27961702441/job/82748952932
- ✅ After (10× green) — fork validation run, `PythonPipelineSuite` x10:
  https://github.com/HyukjinKwon/spark/actions/runs/28074747108

This pull request and its description were written by Isaac.

Closes #56722 from HyukjinKwon/SPARK-57651.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
(cherry picked from commit 0252931)
Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
@HyukjinKwon

Copy link
Copy Markdown
Member Author

Merged to master and branch-4.x.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants