diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index c9256b0a8f33c..f8d7041933907 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -703,6 +703,9 @@ public boolean contains(final UTF8String substring) { * Returns the byte at (byte) position `byteIndex`. If byte index is invalid, returns 0. */ public byte getByte(int byteIndex) { + if (byteIndex < 0 || byteIndex >= numBytes) { + return 0; + } return Platform.getByte(base, offset + byteIndex); } diff --git a/python/pyspark/sql/connect/client/core.py b/python/pyspark/sql/connect/client/core.py index 6e0d4cbcf1ef7..db6067f25e096 100644 --- a/python/pyspark/sql/connect/client/core.py +++ b/python/pyspark/sql/connect/client/core.py @@ -703,6 +703,14 @@ class SparkConnectClient(object): Conceptually the remote spark session that communicates with the server """ + # Thread id currently executing a best-effort ML-cache RPC (clean_cache / delete), or None. + # Used to detect re-entrant ML-cache RPCs on the same thread: a CPython finalizer + # (RemoteModelRef.__del__ -> del_remote_cache -> _delete_ml_cache) can fire while the GIL is + # released inside a blocking ML-cache RPC, issuing a second blocking RPC on the same thread + # that deadlocks the gRPC channel and hangs until the test/process timeout. See the guards in + # _cleanup_ml_cache / _delete_ml_cache. + _ml_cache_rpc_thread: Optional[int] = None + def __init__( self, connection: Union[str, ChannelBuilder], @@ -2397,12 +2405,31 @@ def _delete_ml_cache(self, cache_ids: List[str], evict_only: bool = False) -> Li # try best to delete the cache try: if len(cache_ids) > 0: + # Re-entrancy guard: this is reachable from a RemoteModelRef finalizer + # (__del__ -> del_remote_cache), which CPython may run on this thread while the + # GIL is released inside another in-flight ML-cache RPC (e.g. _cleanup_ml_cache's + # blocking call). Issuing a second blocking RPC re-entrantly can deadlock the gRPC + # channel and hang until the test/process timeout. The nested delete is redundant + # (the in-flight cleanup/delete is already releasing server-side state, and the + # server evicts on session end), so skip it and log so a recurrence in scheduled + # jobs is visible instead of a silent multi-minute hang. + if self._ml_cache_rpc_thread == threading.get_ident(): + logger.warning( + "Skipping re-entrant ML cache delete of %s object ref(s) while another " + "ML-cache RPC is in flight on this thread (avoids a re-entrant gRPC hang).", + len(cache_ids), + ) + return [] command = pb2.Command() command.ml_command.delete.obj_refs.extend( [pb2.ObjectRef(id=cache_id) for cache_id in cache_ids] ) command.ml_command.delete.evict_only = evict_only - _, properties, _ = self.execute_command(command) + self._ml_cache_rpc_thread = threading.get_ident() + try: + _, properties, _ = self.execute_command(command) + finally: + self._ml_cache_rpc_thread = None assert properties is not None @@ -2435,9 +2462,22 @@ def _on_exit(self) -> None: def _cleanup_ml_cache(self) -> None: try: + # See _delete_ml_cache for the re-entrancy rationale. If a finalizer-driven ML-cache + # RPC is already in flight on this thread, skip this nested cleanup rather than risk a + # re-entrant gRPC hang; the in-flight RPC plus server-side session eviction cover it. + if self._ml_cache_rpc_thread == threading.get_ident(): + logger.warning( + "Skipping re-entrant ML cache cleanup while another ML-cache RPC is in flight " + "on this thread (avoids a re-entrant gRPC hang)." + ) + return command = pb2.Command() command.ml_command.clean_cache.SetInParent() - self.execute_command(command) + self._ml_cache_rpc_thread = threading.get_ident() + try: + self.execute_command(command) + finally: + self._ml_cache_rpc_thread = None except Exception: pass diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala index 184868a0df233..9c149a858018a 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala @@ -41,11 +41,20 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { val session = spark import session.implicits._ implicit val ec: ExecutionContextExecutor = ExecutionContext.global + // Run the long-running query through a single call site, and warm it up once before any + // interrupt. Otherwise the very first execution has to ship/fetch the map closure and its + // TypeTag artifact classes to the executor on demand; if an interrupt lands during that + // first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead of + // OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN). + def runMapQuery(sleepMs: Long): Unit = { + spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect() + } + runMapQuery(0) val q1 = Future { - spark.range(10).map(n => { Thread.sleep(30000); n }).collect() + runMapQuery(30000) } val q2 = Future { - spark.range(10).map(n => { Thread.sleep(30000); n }).collect() + runMapQuery(30000) } var q1Interrupted = false var q2Interrupted = false @@ -88,6 +97,16 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { @volatile var finished = false val interrupted = mutable.ListBuffer[String]() + // Run the long-running query through a single call site, and warm it up once before the + // background interruptor starts. Otherwise the first execution has to ship/fetch the map + // closure and its TypeTag artifact classes to the executor on demand; if an interrupt lands + // during that first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead + // of OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN). + def runMapQuery(sleepMs: Long): Unit = { + spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect() + } + runMapQuery(0) + val interruptor = Future { eventually(timeout(20.seconds), interval(1.seconds)) { val ids = spark.interruptAll() @@ -96,15 +115,22 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { } finished } - val e1 = intercept[SparkException] { - spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect() - } - assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1") - val e2 = intercept[SparkException] { - spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect() + try { + val e1 = intercept[SparkException] { + runMapQuery(30.seconds.toMillis) + } + assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1") + val e2 = intercept[SparkException] { + runMapQuery(30.seconds.toMillis) + } + assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2") + } finally { + // Always release the background interruptor. If an assertion above fails, this prevents the + // interruptor Future from continuing to call interruptAll() and canceling operations of the + // subsequent tests in this suite (which previously caused cascading OPERATION_CANCELED + // failures across the whole suite). + finished = true } - assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2") - finished = true assert(awaitResult(interruptor, 10.seconds)) assert(interrupted.length == 2, s"Interrupted operations: $interrupted.") } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala index 834e2d8144e13..8eaf008c58098 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala @@ -180,6 +180,12 @@ class PythonPipelineSuite } test("flow progress events have correct python source code location") { + // `standalone_flow1` writes to its own dedicated streaming table `st2` (rather than appending + // to `table1`, which already has its own implicit flow). Two flows writing concurrently to the + // same file-based streaming table share a single `_spark_metadata` log and race on the batch 0 + // commit (`ManifestFileCommitProtocol`: "Race while writing batch 0"), which made this test + // flaky. Keeping each flow on a separate destination removes the race without changing what the + // test verifies (source-code-location propagation for an append flow). val unresolvedGraph = buildGraph(pythonText = """ |@dp.table( | comment = 'my table' @@ -197,10 +203,12 @@ class PythonPipelineSuite | return df.select("age") | |@dp.append_flow( - | target = 'table1' + | target = 'st2' |) |def standalone_flow1(): | return spark.readStream.table('mv2') + | + |dp.create_streaming_table('st2') |""".stripMargin) val updateContext = TestPipelineUpdateContext(spark, unresolvedGraph, storageRoot) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala index 66e90ec689131..e098d781d302a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala @@ -78,7 +78,18 @@ class ManifestFileCommitProtocol(jobId: String, path: String) if (fileLog.add(batchId, fileStatuses)) { logInfo(log"Committed batch ${MDC(BATCH_ID, batchId)}") } else { - throw new IllegalStateException(s"Race while writing batch $batchId") + // Reaching here means `fileLog.add` found this batchId already committed to the sink + // metadata log at `path`. This is almost always two concurrent streaming queries writing + // to the same output path: they share a single `_spark_metadata` log and cannot coexist. + // Log the path + batchId at ERROR so a recurrence in scheduled jobs is diagnosable from the + // logs alone, without re-reproducing the race. + logError(log"Race while writing batch ${MDC(BATCH_ID, batchId)} to the file sink metadata " + + log"log at ${MDC(PATH, path)}: another writer already committed this batch. This usually " + + log"means multiple concurrent streaming queries are writing to the same output path.") + throw new IllegalStateException( + s"Race while writing batch $batchId to the file sink metadata log at '$path'. Another " + + "writer already committed this batch, which usually means multiple concurrent streaming " + + "queries are writing to the same output path (they share one _spark_metadata log).") } } diff --git a/sql/core/src/test/resources/sql-tests/results/datetime-formatting.sql.out.java21 b/sql/core/src/test/resources/sql-tests/results/datetime-formatting.sql.out.java21 index 3ed7e08c88bd1..80fb48ec14aa1 100644 --- a/sql/core/src/test/resources/sql-tests/results/datetime-formatting.sql.out.java21 +++ b/sql/core/src/test/resources/sql-tests/results/datetime-formatting.sql.out.java21 @@ -450,3 +450,19 @@ select date_format(timestamp_ntz'2023-08-18 09:13:14.123456', 'yyyy-MM-dd HH:mm: struct -- !query output 2023-08-18 09:13:14.123456 2023-08-18 09:13:14.123456 2023-08-18 09:13:14.123456 + + +-- !query +select to_char(TIME'12:13:14', 'HH:mm:ss'), to_varchar(TIME'12:13:14', 'HH:mm:ss') +-- !query schema +struct +-- !query output +12:13:14 12:13:14 + + +-- !query +select to_char(TIME'23:59:59.123456', 'HH:mm:ss.SSSSSS'), to_varchar(TIME'23:59:59.123456', 'HH:mm:ss.SSSSSS') +-- !query schema +struct +-- !query output +23:59:59.123456 23:59:59.123456 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala index d7f28b79acff4..f861388605e0e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala @@ -20,6 +20,7 @@ import java.io.File import java.time.Duration import org.apache.hadoop.conf.Configuration +import org.scalatest.time.SpanSugar._ import org.apache.spark.io.CompressionCodec import org.apache.spark.sql.{Encoders, Row} @@ -1050,20 +1051,47 @@ class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest .transformWithState(new AggregationStatefulProcessor(), TimeMode.None(), OutputMode.Append()) + + // Block until the async maintenance thread has written the RocksDB snapshot `.zip` for the + // given state-store `version` (matching both `.zip` and checkpoint-v2 `_.zip`) + // on each of `partitions`. Used right after `version` is committed (while it is the current + // version), since maintenance only ever snapshots the current version. On timeout it prints + // the actual directory contents so a recurrence in scheduled jobs is diagnosable. + def waitForStateSnapshot(version: Long, partitions: Seq[Int]): StreamAction = Execute { _ => + val opStateDir = new File(tmpDir, "state/0") + eventually(timeout(60.seconds), interval(100.milliseconds)) { + partitions.foreach { partition => + val partitionDir = new File(opStateDir, partition.toString) + val files = Option(partitionDir.listFiles()) + .map(_.map(_.getName).sorted.toSeq).getOrElse(Seq.empty) + val snapshotUploaded = files.exists(_.matches(s"$version(_.*)?\\.zip")) + assert(snapshotUploaded, + s"Snapshot (version $version) for partition $partition was not uploaded in time. " + + s"Contents of $partitionDir: ${files.mkString("[", ", ", "]")}") + } + } + } + testStream(query)( StartStream(checkpointLocation = tmpDir.getCanonicalPath), AddData(inputData, (1, 1L), (2, 2L), (3, 3L), (4, 4L)), ProcessAllAvailable(), AddData(inputData, (5, 1L), (6, 2L), (7, 3L), (8, 4L)), ProcessAllAvailable(), + // State version 2 is now the current version. The snapshotStartBatchId=1 reader below + // needs the version-2 snapshot, and the asynchronous maintenance thread only creates a + // snapshot for the *current* version. So block here, while version 2 is still current, + // until that snapshot has actually been written - this deterministically forces it to + // exist. (A fixed sleep at the *end* is flaky: once later batches advance the current + // version, maintenance never goes back to snapshot version 2, so it may never appear and + // the reader fails with FileNotFoundException on `2.zip`.) + waitForStateSnapshot(version = 2, partitions = Seq(1, 4)), AddData(inputData, (9, 1L), (10, 2L), (11, 3L), (12, 4L)), ProcessAllAvailable(), AddData(inputData, (13, 1L), (14, 2L), (15, 3L), (16, 4L)), ProcessAllAvailable(), AddData(inputData, (17, 1L), (18, 2L), (19, 3L), (20, 4L)), ProcessAllAvailable(), - // Ensure that we get a chance to upload created snapshots - Execute { _ => Thread.sleep(5000) }, StopStream ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/MetricsFailureInjectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/MetricsFailureInjectionSuite.scala index 6fc784f33815f..cedc6963fa6f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/MetricsFailureInjectionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/MetricsFailureInjectionSuite.scala @@ -572,6 +572,16 @@ class MetricsFailureInjectionSuite // OSS Spark cannot roll back a partially-finished result stage, so the job aborts. With // the default RESULT_STAGE_DELAY=0 the result stage is corrupted before any task // dispatches and the rollback path does not abort. + // + // We group by the high-cardinality `id` column (not `low_cardinality_col`) so that every + // one of the 20 reducer partitions reads data from the corrupted mapper 0. Otherwise only + // the handful of reducer partitions that happen to hold mapper-0's few low-cardinality keys + // would observe the FetchFailed, and the abort would only fire when one of those specific + // partitions happened to be scheduled after the (asynchronous) corruption -- a scheduling + // race that made this test flaky under Maven. With `id`, every partition depends on mapper + // 0, so once RESULT_STAGE_DELAY=1 has corrupted it (after the first result task), local[2] + // dispatches the remaining result tasks afterwards and at least one is guaranteed to hit + // the corrupted mapper, deterministically triggering the indeterminate-stage abort. withTable("test_table") { setUpTestTable("test_table") withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "20") { @@ -580,7 +590,7 @@ class MetricsFailureInjectionSuite config.Tests.INJECT_SHUFFLE_FETCH_FAILURES_RESULT_STAGE_DELAY.key -> "1", config.Tests.INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE.key -> "true") { val df = spark.read.table("test_table") - .groupBy("low_cardinality_col") + .groupBy("id") .count() val ex = intercept[SparkException] { df.collect()