Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -568,18 +568,28 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
.format("console")
.start()

// Diagnostic context attached to assertion failures so that, if this test ever flakes again in
// a scheduled job, the log pinpoints which event was missing and the surrounding client state
// (e.g. whether the client-side listener is still registered, whether the query errored). The
// server-side listener can silently self-remove on a send failure, dropping later events.
def diag(stage: String): String =
s"[$stage] q.isActive=${q.isActive}, q.exception=${q.exception}, " +
s"start=${listener.start.size}, progress=${listener.progress.size}, " +
s"terminate=${listener.terminate.size}, " +
s"clientListeners=${spark.streams.listListeners().length}"

try {
q.processAllAvailable()
eventually(timeout(30.seconds)) {
assert(q.isActive)
assert(listener.start.length == 1)
assert(listener.progress.nonEmpty)
assert(q.isActive, diag("active"))
assert(listener.start.length == 1, diag("start"))
assert(listener.progress.nonEmpty, diag("progress"))
}
} finally {
q.stop()
eventually(timeout(60.seconds), interval(1.seconds)) {
assert(!q.isActive)
assert(listener.terminate.nonEmpty)
assert(!q.isActive, diag("stopped"))
assert(listener.terminate.nonEmpty, diag("terminate"))
}
}
}
Expand Down Expand Up @@ -746,9 +756,10 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L
}

class MyListener extends StreamingQueryListener {
var start: Seq[String] = Seq.empty
var progress: Seq[String] = Seq.empty
var terminate: Seq[String] = Seq.empty
// @volatile so the test thread reliably observes updates made on the listener dispatch thread.
@volatile var start: Seq[String] = Seq.empty
@volatile var progress: Seq[String] = Seq.empty
@volatile var terminate: Seq[String] = Seq.empty

override def onQueryStarted(event: QueryStartedEvent): Unit = {
start = start :+ event.json
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,19 @@ private[sql] class SparkConnectListenerBusListener(
with Logging {

val sessionHolder = serverSideListenerHolder.sessionHolder

// Number of attempts to transmit an event to the client before giving up. A single transient
// onNext failure (e.g. a momentary gRPC hiccup) should not permanently remove the listener and
// silently drop all subsequent events for the session, including the terminal
// QueryTerminatedEvent; only tear down when sending keeps failing across retries.
private val maxSendAttempts = 3
private val sendRetryBackoffMs = 200L

// The method used to stream back the events to the client.
// The event is serialized to json and sent to the client.
// If any exception is thrown while transmitting back the event, the listener is removed,
// all related sources are cleaned up, and the long-running thread will proceed to send
// the final ResultComplete response.
// If any exception is thrown while transmitting back the event (after a few retries), the
// listener is removed, all related sources are cleaned up, and the long-running thread will
// proceed to send the final ResultComplete response.
private def send(eventJson: String, eventType: StreamingQueryEventType): Unit = {
try {
val event = StreamingQueryListenerEvent
Expand All @@ -117,7 +125,7 @@ private[sql] class SparkConnectListenerBusListener(
.addAllEvents(Array[StreamingQueryListenerEvent](event).toImmutableArraySeq.asJava)
.build()

responseObserver.onNext(
sendWithRetry(
ExecutePlanResponse
.newBuilder()
.setSessionId(sessionHolder.sessionId)
Expand All @@ -136,6 +144,22 @@ private[sql] class SparkConnectListenerBusListener(
}
}

// Sends a response, retrying onNext a bounded number of times on a transient failure. The
// exception from the final attempt propagates to send()'s handler, which removes the listener.
private def sendWithRetry(response: ExecutePlanResponse): Unit = {
var attempt = 1
while (attempt < maxSendAttempts) {
try {
responseObserver.onNext(response)
return
} catch {
case NonFatal(_) => Thread.sleep(sendRetryBackoffMs)
}
attempt += 1
}
responseObserver.onNext(response)
}

def sendResultComplete(): Unit = {
responseObserver
.asInstanceOf[ExecuteResponseObserver[ExecutePlanResponse]]
Expand Down