Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
val session = spark
import session.implicits._
implicit val ec: ExecutionContextExecutor = ExecutionContext.global
// Run the long-running query through a single call site, and warm it up once before any
// interrupt. Otherwise the very first execution has to ship/fetch the map closure and its
// TypeTag artifact classes to the executor on demand; if an interrupt lands during that
// first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead of
// OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN).
def runMapQuery(sleepMs: Long): Unit = {
spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect()
}
runMapQuery(0)
val q1 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
runMapQuery(30000)
}
val q2 = Future {
spark.range(10).map(n => { Thread.sleep(30000); n }).collect()
runMapQuery(30000)
}
var q1Interrupted = false
var q2Interrupted = false
Expand Down Expand Up @@ -88,6 +97,16 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
@volatile var finished = false
val interrupted = mutable.ListBuffer[String]()

// Run the long-running query through a single call site, and warm it up once before the
// background interruptor starts. Otherwise the first execution has to ship/fetch the map
// closure and its TypeTag artifact classes to the executor on demand; if an interrupt lands
// during that first-time remote class fetch, it surfaces as a RemoteClassLoaderError instead
// of OPERATION_CANCELED, making this test flaky (see SparkSessionE2ESuite$$typecreatorNN).
def runMapQuery(sleepMs: Long): Unit = {
spark.range(10).map(n => { Thread.sleep(sleepMs); n }).collect()
}
runMapQuery(0)

val interruptor = Future {
eventually(timeout(20.seconds), interval(1.seconds)) {
val ids = spark.interruptAll()
Expand All @@ -96,15 +115,22 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
}
finished
}
val e1 = intercept[SparkException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
}
assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1")
val e2 = intercept[SparkException] {
spark.range(10).map(n => { Thread.sleep(30.seconds.toMillis); n }).collect()
try {
val e1 = intercept[SparkException] {
runMapQuery(30.seconds.toMillis)
}
assert(e1.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e1")
val e2 = intercept[SparkException] {
runMapQuery(30.seconds.toMillis)
}
assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2")
} finally {
// Always release the background interruptor. If an assertion above fails, this prevents the
// interruptor Future from continuing to call interruptAll() and canceling operations of the
// subsequent tests in this suite (which previously caused cascading OPERATION_CANCELED
// failures across the whole suite).
finished = true
}
assert(e2.getMessage.contains("OPERATION_CANCELED"), s"Unexpected exception: $e2")
finished = true
assert(awaitResult(interruptor, 10.seconds))
assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
}
Expand Down