From 2ab9b53c9255443be7743caf4511d6cd4a5dacf5 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 24 Jun 2026 14:31:12 +0900 Subject: [PATCH] [SPARK-57657][CONNECT] Don't drop streaming listener events on a transient send failure ### What changes were proposed in this pull request? In `SparkConnectListenerBusListener.send`, retry `responseObserver.onNext` a small bounded number of times before tearing the listener down, instead of removing it on the first exception. Also add diagnostic context to the `ClientStreamingQuerySuite."listener events"` assertions and make the test listener's fields `@volatile`. ### Why are the changes needed? `ClientStreamingQuerySuite."listener events"` is flaky (e.g. Java 21 connect): the `QueryStarted`/`QueryProgress` events arrive but the terminal `QueryTerminatedEvent` is never received even though the query has stopped. The server-side listener removes itself and stops sending all further events on the first `onNext` failure, so a single transient gRPC hiccup on a frequent progress event silently drops the later terminate event. A bounded retry keeps the listener alive across transient failures while still cleaning up when the client is genuinely unresponsive. The connect server runs in a separate process whose logs are not captured in CI, so the exact failure is inferred; the added assertion diagnostics surface the client-side state if this test ever flakes again in a scheduled job, to confirm/refine the root cause. ### Does this PR introduce any user-facing change? No. Server hardening + test diagnostics only. ### How was this patch tested? Re-ran the full connect module and `ClientStreamingQuerySuite."listener events"` 8x on CI; all green. Existing `SparkConnectListenerBusListenerSuite` onNext-throw tests still pass. ### Was this patch authored or co-authored using generative AI tooling? Yes, drafted with Claude Code. Co-authored-by: Isaac --- .../streaming/ClientStreamingQuerySuite.scala | 27 +++++++++++----- .../SparkConnectListenerBusListener.scala | 32 ++++++++++++++++--- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala index d9d2498393c63..5574ed6830c68 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala @@ -568,18 +568,28 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L .format("console") .start() + // Diagnostic context attached to assertion failures so that, if this test ever flakes again in + // a scheduled job, the log pinpoints which event was missing and the surrounding client state + // (e.g. whether the client-side listener is still registered, whether the query errored). The + // server-side listener can silently self-remove on a send failure, dropping later events. + def diag(stage: String): String = + s"[$stage] q.isActive=${q.isActive}, q.exception=${q.exception}, " + + s"start=${listener.start.size}, progress=${listener.progress.size}, " + + s"terminate=${listener.terminate.size}, " + + s"clientListeners=${spark.streams.listListeners().length}" + try { q.processAllAvailable() eventually(timeout(30.seconds)) { - assert(q.isActive) - assert(listener.start.length == 1) - assert(listener.progress.nonEmpty) + assert(q.isActive, diag("active")) + assert(listener.start.length == 1, diag("start")) + assert(listener.progress.nonEmpty, diag("progress")) } } finally { q.stop() eventually(timeout(60.seconds), interval(1.seconds)) { - assert(!q.isActive) - assert(listener.terminate.nonEmpty) + assert(!q.isActive, diag("stopped")) + assert(listener.terminate.nonEmpty, diag("terminate")) } } } @@ -746,9 +756,10 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L } class MyListener extends StreamingQueryListener { - var start: Seq[String] = Seq.empty - var progress: Seq[String] = Seq.empty - var terminate: Seq[String] = Seq.empty + // @volatile so the test thread reliably observes updates made on the listener dispatch thread. + @volatile var start: Seq[String] = Seq.empty + @volatile var progress: Seq[String] = Seq.empty + @volatile var terminate: Seq[String] = Seq.empty override def onQueryStarted(event: QueryStartedEvent): Unit = { start = start :+ event.json diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 91fe395f520d8..468041820f868 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -99,11 +99,19 @@ private[sql] class SparkConnectListenerBusListener( with Logging { val sessionHolder = serverSideListenerHolder.sessionHolder + + // Number of attempts to transmit an event to the client before giving up. A single transient + // onNext failure (e.g. a momentary gRPC hiccup) should not permanently remove the listener and + // silently drop all subsequent events for the session, including the terminal + // QueryTerminatedEvent; only tear down when sending keeps failing across retries. + private val maxSendAttempts = 3 + private val sendRetryBackoffMs = 200L + // The method used to stream back the events to the client. // The event is serialized to json and sent to the client. - // If any exception is thrown while transmitting back the event, the listener is removed, - // all related sources are cleaned up, and the long-running thread will proceed to send - // the final ResultComplete response. + // If any exception is thrown while transmitting back the event (after a few retries), the + // listener is removed, all related sources are cleaned up, and the long-running thread will + // proceed to send the final ResultComplete response. private def send(eventJson: String, eventType: StreamingQueryEventType): Unit = { try { val event = StreamingQueryListenerEvent @@ -117,7 +125,7 @@ private[sql] class SparkConnectListenerBusListener( .addAllEvents(Array[StreamingQueryListenerEvent](event).toImmutableArraySeq.asJava) .build() - responseObserver.onNext( + sendWithRetry( ExecutePlanResponse .newBuilder() .setSessionId(sessionHolder.sessionId) @@ -136,6 +144,22 @@ private[sql] class SparkConnectListenerBusListener( } } + // Sends a response, retrying onNext a bounded number of times on a transient failure. The + // exception from the final attempt propagates to send()'s handler, which removes the listener. + private def sendWithRetry(response: ExecutePlanResponse): Unit = { + var attempt = 1 + while (attempt < maxSendAttempts) { + try { + responseObserver.onNext(response) + return + } catch { + case NonFatal(_) => Thread.sleep(sendRetryBackoffMs) + } + attempt += 1 + } + responseObserver.onNext(response) + } + def sendResultComplete(): Unit = { responseObserver .asInstanceOf[ExecuteResponseObserver[ExecutePlanResponse]]