From 2f5259bfb829103f439e5b3b71354b4977687a23 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 24 Jun 2026 14:31:11 +0900 Subject: [PATCH] [SPARK-57656][CONNECT][TESTS] Stabilize flaky SparkSessionE2ESuite interrupt-all tests ### What changes were proposed in this pull request? Make the two `SparkSessionE2ESuite` "interrupt all" tests robust: 1. Run each long-running typed `map` query through a single call site and warm it up once (sleep=0) before any interrupt, so the closure/`TypeTag` artifact classes are already loaded on the executor. Otherwise an interrupt landing during that first-time remote class fetch surfaces as `RemoteClassLoaderError` (`...SparkSessionE2ESuite$$typecreatorNN$1.class`) instead of `OPERATION_CANCELED`. 2. Wrap the foreground-interrupt test body in `try/finally { finished = true }` so a failed assertion cannot leave the background `interruptor` Future running and canceling the operations of subsequent tests (which previously cascaded into ~7 failures across the suite). ### Why are the changes needed? `SparkSessionE2ESuite` intermittently fails in master push Build (SBT) and Maven (Scala 2.13, JDK 21/25). Confirmed flaky (the same module group passes on other runs of the same commit). ### Does this PR introduce any user-facing change? No, test-only. ### How was this patch tested? Re-ran `SparkSessionE2ESuite` 8x plus the full connect module on CI; all green. ### Was this patch authored or co-authored using generative AI tooling? Yes, drafted with Claude Code. Co-authored-by: Isaac --- .../sql/connect/SparkSessionE2ESuite.scala | 46 +++++++++++++++---- 1 file changed, 36 insertions(+), 10 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala index 184868a0df233..9c149a858018a 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala @@ -41,11 +41,20 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { val session = spark import session.implicits._ implicit val ec: ExecutionContextExecutor = ExecutionContext.global + // Run the long-running query through a single call site, and warm it up once before any + // interrupt. Otherwise the very first execution has to ship/fetch the map closure and its + // TypeTag artifact classes to the executor on demand; if an interrupt lands during that + // first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead of + // OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN). + def runMapQuery(sleepMs: Long): Unit = { + spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect() + } + runMapQuery(0) val q1 = Future { - spark.range(10).map(n => { Thread.sleep(30000); n }).collect() + runMapQuery(30000) } val q2 = Future { - spark.range(10).map(n => { Thread.sleep(30000); n }).collect() + runMapQuery(30000) } var q1Interrupted = false var q2Interrupted = false @@ -88,6 +97,16 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { @volatile var finished = false val interrupted = mutable.ListBuffer[String]() + // Run the long-running query through a single call site, and warm it up once before the + // background interruptor starts. Otherwise the first execution has to ship/fetch the map + // closure and its TypeTag artifact classes to the executor on demand; if an interrupt lands + // during that first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead + // of OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN). + def runMapQuery(sleepMs: Long): Unit = { + spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect() + } + runMapQuery(0) + val interruptor = Future { eventually(timeout(20.seconds), interval(1.seconds)) { val ids = spark.interruptAll() @@ -96,15 +115,22 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { } finished } - val e1 = intercept[SparkException] { - spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect() - } - assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1") - val e2 = intercept[SparkException] { - spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect() + try { + val e1 = intercept[SparkException] { + runMapQuery(30.seconds.toMillis) + } + assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1") + val e2 = intercept[SparkException] { + runMapQuery(30.seconds.toMillis) + } + assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2") + } finally { + // Always release the background interruptor. If an assertion above fails, this prevents the + // interruptor Future from continuing to call interruptAll() and canceling operations of the + // subsequent tests in this suite (which previously caused cascading OPERATION_CANCELED + // failures across the whole suite). + finished = true } - assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2") - finished = true assert(awaitResult(interruptor, 10.seconds)) assert(interrupted.length == 2, s"Interrupted operations: $interrupted.") }