diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 834e2d8144e13..8eaf008c58098 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -180,6 +180,12 @@ class PythonPipelineSuite } test("flow progress events have correct python source code location") { + // `standalone_flow1` writes to its own dedicated streaming table `st2` (rather than appending + // to `table1`, which already has its own implicit flow). Two flows writing concurrently to the + // same file-based streaming table share a single `_spark_metadata` log and race on the batch 0 + // commit (`ManifestFileCommitProtocol`: "Race while writing batch 0"), which made this test + // flaky. Keeping each flow on a separate destination removes the race without changing what the + // test verifies (source-code-location propagation for an append flow). val unresolvedGraph = buildGraph(pythonText = """ |@dp.table( | comment = 'my table' @@ -197,10 +203,12 @@ class PythonPipelineSuite | return df.select("age") | |@dp.append_flow( - | target = 'table1' + | target = 'st2' |) |def standalone_flow1(): | return spark.readStream.table('mv2') + | + |dp.create_streaming_table('st2') |""".stripMargin) val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph, storageRoot) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 66e90ec689131..804faecd37f4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -78,7 +78,19 @@ class ManifestFileCommitProtocol(jobId: String, path: String) if (fileLog.add(batchId, fileStatuses)) { logInfo(log"Committed batch ${MDC(BATCH_ID, batchId)}") } else { - throw new IllegalStateException(s"Race while writing batch $batchId") + // Reaching here means `fileLog.add` found this batchId already committed to the sink + // metadata log at `path`. This is almost always two concurrent streaming queries writing + // to the same output path: they share a single `_spark_metadata` log and cannot coexist. + // Log the path + batchId at ERROR so a recurrence in scheduled jobs is diagnosable from the + // logs alone, without re-reproducing the race. + // Build the message once and reuse it for both the log and the exception so they stay in + // sync (see review on SPARK-57651). + val errorMsg = log"Race while writing batch ${MDC(BATCH_ID, batchId)} to the file sink " + + log"metadata log at ${MDC(PATH, path)}: another writer already committed this batch. " + + log"This usually means multiple concurrent streaming queries are writing to the same " + + log"output path (they share one _spark_metadata log)." + logError(errorMsg) + throw new IllegalStateException(errorMsg.message) } }