diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala index 184868a0df233..9c149a858018a 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala @@ -41,11 +41,20 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { val session = spark import session.implicits._ implicit val ec: ExecutionContextExecutor = ExecutionContext.global + // Run the long-running query through a single call site, and warm it up once before any + // interrupt. Otherwise the very first execution has to ship/fetch the map closure and its + // TypeTag artifact classes to the executor on demand; if an interrupt lands during that + // first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead of + // OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN). + def runMapQuery(sleepMs: Long): Unit = { + spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect() + } + runMapQuery(0) val q1 = Future { - spark.range(10).map(n => { Thread.sleep(30000); n }).collect() + runMapQuery(30000) } val q2 = Future { - spark.range(10).map(n => { Thread.sleep(30000); n }).collect() + runMapQuery(30000) } var q1Interrupted = false var q2Interrupted = false @@ -88,6 +97,16 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { @volatile var finished = false val interrupted = mutable.ListBuffer[String]() + // Run the long-running query through a single call site, and warm it up once before the + // background interruptor starts. Otherwise the first execution has to ship/fetch the map + // closure and its TypeTag artifact classes to the executor on demand; if an interrupt lands + // during that first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead + // of OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN). + def runMapQuery(sleepMs: Long): Unit = { + spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect() + } + runMapQuery(0) + val interruptor = Future { eventually(timeout(20.seconds), interval(1.seconds)) { val ids = spark.interruptAll() @@ -96,15 +115,22 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { } finished } - val e1 = intercept[SparkException] { - spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect() - } - assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1") - val e2 = intercept[SparkException] { - spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect() + try { + val e1 = intercept[SparkException] { + runMapQuery(30.seconds.toMillis) + } + assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1") + val e2 = intercept[SparkException] { + runMapQuery(30.seconds.toMillis) + } + assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2") + } finally { + // Always release the background interruptor. If an assertion above fails, this prevents the + // interruptor Future from continuing to call interruptAll() and canceling operations of the + // subsequent tests in this suite (which previously caused cascading OPERATION_CANCELED + // failures across the whole suite). + finished = true } - assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2") - finished = true assert(awaitResult(interruptor, 10.seconds)) assert(interrupted.length == 2, s"Interrupted operations: $interrupted.") }