[DO-NOT-MERGE][CONNECT] Don't drop streaming listener events on a transient send failure#56729
[DO-NOT-MERGE][CONNECT] Don't drop streaming listener events on a transient send failure#56729HyukjinKwon wants to merge 1 commit into
Conversation
HyukjinKwon
left a comment
There was a problem hiding this comment.
Automated review (Claude Code, /spark-dev:review) — posted as a comment, not an approval.
Verdict: reasonable hardening; a couple of points worth weighing before merge. No hard blockers, but see the dispatch-thread note.
Correctness / design discussion
send()runs on Spark's asyncStreamingQueryListenerBusdispatch thread, and the retry now doesThread.sleep(200ms)up to twice on it. That thread is shared across all queries/listeners in the session, so a slow or genuinely-dead client could add up to ~400ms of head-of-line latency to event delivery for other queries before the listener is finally torn down. Worth deciding whether that tradeoff is acceptable vs. e.g. retrying without sleeping, or moving teardown off the dispatch path. (The previous code failed fast on the first exception, so this is a behavior change for the unresponsive-client case.)- Root cause is inferred, not confirmed — the connect server logs aren't captured in CI, so I could not prove a transient
onNextfailure is what drops the terminate event. The retry is the most likely fix, and the addeddiag(stage)assertion context +@volatilefields are deliberately there to capture the real state if it recurs in a scheduled job. Flagging so reviewers don't read this as a confirmed-root-cause fix.
Nits (non-blocking)
maxSendAttempts = 3/sendRetryBackoffMs = 200are magic numbers; fine as-is, but a brief "why these values" note (or config) would help future tuning.- Interrupt handling (
Thread.currentThread().interrupt(); attempt = maxSendAttempts) correctly stops retrying and falls through to cleanup — good.
Tests
- The existing
SparkConnectListenerBusListenerSuiteonNext-throw tests still pass because they throw on everyonNext, so retries exhaust and cleanup still happens within their 5seventually. Confirmed green on the fork run.
Validated: full connect module + "listener events" 8× green on a fork (link in PR description).
…sient send failure ### What changes were proposed in this pull request? In `SparkConnectListenerBusListener.send`, retry `responseObserver.onNext` a small bounded number of times before tearing the listener down, instead of removing it on the first exception. Also add diagnostic context to the `ClientStreamingQuerySuite."listener events"` assertions and make the test listener's fields `@volatile`. ### Why are the changes needed? `ClientStreamingQuerySuite."listener events"` is flaky (e.g. Java 21 connect): the `QueryStarted`/`QueryProgress` events arrive but the terminal `QueryTerminatedEvent` is never received even though the query has stopped. The server-side listener removes itself and stops sending all further events on the first `onNext` failure, so a single transient gRPC hiccup on a frequent progress event silently drops the later terminate event. A bounded retry keeps the listener alive across transient failures while still cleaning up when the client is genuinely unresponsive. The connect server runs in a separate process whose logs are not captured in CI, so the exact failure is inferred; the added assertion diagnostics surface the client-side state if this test ever flakes again in a scheduled job, to confirm/refine the root cause. ### Does this PR introduce any user-facing change? No. Server hardening + test diagnostics only. ### How was this patch tested? Re-ran the full connect module and `ClientStreamingQuerySuite."listener events"` 8x on CI; all green. Existing `SparkConnectListenerBusListenerSuite` onNext-throw tests still pass. ### Was this patch authored or co-authored using generative AI tooling? Yes, drafted with Claude Code. Co-authored-by: Isaac
|
Minimized the main (server-side) code change: instead of rewriting Note: this version lets an |
What changes were proposed in this pull request?
In
SparkConnectListenerBusListener.send, retryresponseObserver.onNexta small bounded number oftimes before tearing the listener down, instead of removing it on the first exception. Also add
diagnostic context to the
ClientStreamingQuerySuite."listener events"assertions and make the testlistener's fields
@volatile.Why are the changes needed?
ClientStreamingQuerySuite."listener events"is flaky: theQueryStarted/QueryProgresseventsarrive but the terminal
QueryTerminatedEventis never received even though the query has stopped.The server-side listener removes itself and stops sending all further events on the first
onNextfailure, so a single transient gRPC hiccup on a frequent progress event silently drops thelater terminate event. A bounded retry keeps the listener alive across transient failures while still
cleaning up when the client is genuinely unresponsive.
The connect server runs in a separate process whose logs are not captured in CI, so the exact failure
is inferred; the added assertion diagnostics (
diag(stage)) surface the client-side state if thistest ever flakes again in a scheduled job, to confirm/refine the root cause.
Before (failing in apache/spark CI):
listener events90s timeout,terminateempty —https://github.com/apache/spark/actions/runs/28004202389/job/82884598238
After (this change, validated on a fork): full connect module green and
ClientStreamingQuerySuite."listener events"re-run 8x with 0 failures —https://github.com/HyukjinKwon/spark/actions/runs/28074772169
Does this PR introduce any user-facing change?
No. Server hardening + test diagnostics only.
How was this patch tested?
Re-ran the full connect module and
ClientStreamingQuerySuite."listener events"8x on CI (linkabove); all green. Existing
SparkConnectListenerBusListenerSuiteonNext-throw tests still pass.Was this patch authored or co-authored using generative AI tooling?
Yes, drafted with Claude Code.