Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ class PythonPipelineSuite
}

test("flow progress events have correct python source code location") {
// `standalone_flow1` writes to its own dedicated streaming table `st2` (rather than appending
// to `table1`, which already has its own implicit flow). Two flows writing concurrently to the
// same file-based streaming table share a single `_spark_metadata` log and race on the batch 0
// commit (`ManifestFileCommitProtocol`: "Race while writing batch 0"), which made this test
// flaky. Keeping each flow on a separate destination removes the race without changing what the
// test verifies (source-code-location propagation for an append flow).
val unresolvedGraph = buildGraph(pythonText = """
|@dp.table(
| comment = 'my table'
Expand All @@ -197,10 +203,12 @@ class PythonPipelineSuite
| return df.select("age")
|
|@dp.append_flow(
| target = 'table1'
| target = 'st2'
|)
|def standalone_flow1():
| return spark.readStream.table('mv2')
|
|dp.create_streaming_table('st2')
|""".stripMargin)

val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph, storageRoot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,19 @@ class ManifestFileCommitProtocol(jobId: String, path: String)
if (fileLog.add(batchId, fileStatuses)) {
logInfo(log"Committed batch ${MDC(BATCH_ID, batchId)}")
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
// Reaching here means `fileLog.add` found this batchId already committed to the sink
// metadata log at `path`. This is almost always two concurrent streaming queries writing
// to the same output path: they share a single `_spark_metadata` log and cannot coexist.
// Log the path + batchId at ERROR so a recurrence in scheduled jobs is diagnosable from the
// logs alone, without re-reproducing the race.
// Build the message once and reuse it for both the log and the exception so they stay in
// sync (see review on SPARK-57651).
val errorMsg = log"Race while writing batch ${MDC(BATCH_ID, batchId)} to the file sink " +
log"metadata log at ${MDC(PATH, path)}: another writer already committed this batch. " +
log"This usually means multiple concurrent streaming queries are writing to the same " +
log"output path (they share one _spark_metadata log)."
logError(errorMsg)
throw new IllegalStateException(errorMsg.message)
}
}

Expand Down