[SPARK-57651][CONNECT][SS][TESTS] Deflake PythonPipelineSuite and add FileStreamSink batch-commit race diagnostics#56722
[SPARK-57651][CONNECT][SS][TESTS] Deflake PythonPipelineSuite and add FileStreamSink batch-commit race diagnostics#56722HyukjinKwon wants to merge 3 commits into
Conversation
…ding concurrent file-sink writers
The test 'flow progress events have correct python source code location' defined
two flows writing to the same streaming table 'table1' (its implicit flow plus the
'standalone_flow1' append flow). They run concurrently and share a single
FileStreamSink '_spark_metadata' commit log, so they race on the batch-0 commit in
ManifestFileCommitProtocol ('Race while writing batch 0'), making the test flaky.
Point 'standalone_flow1' at its own dedicated streaming table 'st2' so each flow
writes a distinct sink. The append-flow source-code-location assertions are
unchanged (target edited in place; create_streaming_table added below them so line
numbers do not shift).
…-commit race When two writers race on the same FileStreamSink _spark_metadata log, commitJob threw a bare 'Race while writing batch N'. Log the sink metadata path + batchId at ERROR and include them (plus the likely cause: concurrent streaming queries writing to the same output path) in the exception, so a recurrence in scheduled jobs is diagnosable from logs without re-reproducing the race.
| throw new IllegalStateException( | ||
| s"Race while writing batch $batchId to the file sink metadata log at '$path'. Another " + | ||
| "writer already committed this batch, which usually means multiple concurrent streaming " + | ||
| "queries are writing to the same output path (they share one _spark_metadata log).") |
There was a problem hiding this comment.
This seems to be almost identical information. Can we share the same error message for both logError and IllegalStateException?
There was a problem hiding this comment.
Good point — done in df7d9a3. Built the message once via the structured logging API and reused its rendered text (errorMsg.message) for the IllegalStateException, so the log and the exception can't drift. Thanks for the review!
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM (with a nit comment)
Automated code review (ReviewFlow /
|
| Severity | Critical | Major | Minor | Info | Total |
|---|---|---|---|---|---|
| Findings | 0 | 0 | 0 | 0 | 0 |
No issues found in the changed files
(sql/connect/server/.../PythonPipelineSuite.scala,
sql/core/.../ManifestFileCommitProtocol.scala).
Notes from manual double-check:
- The test change is data-only (retarget
standalone_flow1to its ownst2table + a
create_streaming_tablecall added below the asserted lines), so the source-code-location
assertions are unaffected — confirmed by 10× repeat runs ofPythonPipelineSuiteon CI
(106/106 each): https://github.com/HyukjinKwon/spark/actions/runs/28074747108 - The
ManifestFileCommitProtocolchange is diagnostics-only (richer log + exception message on an
already-existing race); no behavioral change.
Verdict: no blocking issues. Posting as a comment, not an approval.
…legalStateException Address review comment: build the race message once via the structured logging API and reuse its rendered text (errorMsg.message) for the IllegalStateException, instead of duplicating near-identical strings. Co-authored-by: Isaac
…FileStreamSink batch-commit race diagnostics
### What changes were proposed in this pull request?
Two changes:
1. **Deflake `PythonPipelineSuite."flow progress events have correct python source code location"`.**
The test defined **two** flows writing to the same streaming table `table1`: the implicit flow
created by `dp.table def table1()` and the `dp.append_flow(target='table1') def standalone_flow1()`
append flow. Because the two flows have no data dependency on each other, the pipeline scheduler
runs them **concurrently**, and since both write to the same file-based streaming table they share a
single `FileStreamSink` `_spark_metadata` commit log. They then race on the batch-0 commit in
`ManifestFileCommitProtocol.commitJob`, which throws `IllegalStateException("Race while writing batch 0")`.
This made the test intermittently fail (e.g. it passes on most master runs but failed in the run linked below).
The fix points `standalone_flow1` at its own dedicated streaming table created with
`dp.create_streaming_table('st2')`, so each flow writes to a distinct sink and there is no shared
`_spark_metadata`. The test's intent (verifying that an append flow's Python source-code location is
propagated) is unchanged; the asserted line numbers are preserved (the `target` is edited in place and
the `create_streaming_table` call is added below the asserted `def` lines so nothing shifts).
Note: serializing the two flows would **not** be a correct alternative — `FileStreamSink.addBatch`
skips a batch when `batchId <= latestBatchId`, so a serialized second writer would silently drop its
batch rather than error. Separate sinks is the correct fix.
2. **Add diagnostics to the `ManifestFileCommitProtocol` batch-commit race.** Previously `commitJob`
threw a bare `Race while writing batch N`. It now logs the sink `_spark_metadata` path and the
batchId at ERROR, and the exception message names the likely cause (multiple concurrent streaming
queries writing to the same output path). This way, if the same class of race ever resurfaces in a
scheduled job or a real pipeline, it is diagnosable from the logs alone without having to reproduce it.
### Why are the changes needed?
`PythonPipelineSuite` is a flaky test on CI (the race surfaces intermittently in the
`streaming, sql-kafka-0-10, ..., connect, avro` module group). The goal is a green, non-flaky build.
### Does this PR introduce _any_ user-facing change?
No. The only production change is a clearer error message / additional ERROR log when the
(already-existing) batch-commit race is hit.
### How was this patch tested?
A trimmed workflow on a fork builds `sql/connect/server` and runs **only** `PythonPipelineSuite`,
**repeated 10×** (a single green run does not prove a flaky race is gone). All 10 runs were green
(106 tests, 0 failures each).
- ❌ Before (race observed) — apache/spark master push run, module group incl. `connect`:
https://github.com/apache/spark/actions/runs/27961702441/job/82748952932
- ✅ After (10× green) — fork validation run, `PythonPipelineSuite` x10:
https://github.com/HyukjinKwon/spark/actions/runs/28074747108
This pull request and its description were written by Isaac.
Closes #56722 from HyukjinKwon/SPARK-57651.
Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
(cherry picked from commit 0252931)
Signed-off-by: Hyukjin Kwon <hyukjin.kwon@databricks.com>
|
Merged to master and branch-4.x. |
What changes were proposed in this pull request?
Two changes:
Deflake
PythonPipelineSuite."flow progress events have correct python source code location".The test defined two flows writing to the same streaming table
table1: the implicit flowcreated by
@dp.table def table1()and the@dp.append_flow(target='table1') def standalone_flow1()append flow. Because the two flows have no data dependency on each other, the pipeline scheduler
runs them concurrently, and since both write to the same file-based streaming table they share a
single
FileStreamSink_spark_metadatacommit log. They then race on the batch-0 commit inManifestFileCommitProtocol.commitJob, which throwsIllegalStateException("Race while writing batch 0").This made the test intermittently fail (e.g. it passes on most master runs but failed in the run linked below).
The fix points
standalone_flow1at its own dedicated streaming table created withdp.create_streaming_table('st2'), so each flow writes to a distinct sink and there is no shared_spark_metadata. The test's intent (verifying that an append flow's Python source-code location ispropagated) is unchanged; the asserted line numbers are preserved (the
targetis edited in place andthe
create_streaming_tablecall is added below the asserteddeflines so nothing shifts).Note: serializing the two flows would not be a correct alternative —
FileStreamSink.addBatchskips a batch when
batchId <= latestBatchId, so a serialized second writer would silently drop itsbatch rather than error. Separate sinks is the correct fix.
Add diagnostics to the
ManifestFileCommitProtocolbatch-commit race. PreviouslycommitJobthrew a bare
Race while writing batch N. It now logs the sink_spark_metadatapath and thebatchId at ERROR, and the exception message names the likely cause (multiple concurrent streaming
queries writing to the same output path). This way, if the same class of race ever resurfaces in a
scheduled job or a real pipeline, it is diagnosable from the logs alone without having to reproduce it.
Why are the changes needed?
PythonPipelineSuiteis a flaky test on CI (the race surfaces intermittently in thestreaming, sql-kafka-0-10, ..., connect, avromodule group). The goal is a green, non-flaky build.Does this PR introduce any user-facing change?
No. The only production change is a clearer error message / additional ERROR log when the
(already-existing) batch-commit race is hit.
How was this patch tested?
A trimmed workflow on a fork builds
sql/connect/serverand runs onlyPythonPipelineSuite,repeated 10× (a single green run does not prove a flaky race is gone). All 10 runs were green
(106 tests, 0 failures each).
connect:https://github.com/apache/spark/actions/runs/27961702441/job/82748952932
PythonPipelineSuitex10:https://github.com/HyukjinKwon/spark/actions/runs/28074747108
This pull request and its description were written by Isaac.