Skip to content

[DO-NOT-MERGE][CONNECT] Don't drop streaming listener events on a transient send failure#56729

Draft
HyukjinKwon wants to merge 1 commit into
apache:masterfrom
HyukjinKwon:SPARK-57657
Draft

[DO-NOT-MERGE][CONNECT] Don't drop streaming listener events on a transient send failure#56729
HyukjinKwon wants to merge 1 commit into
apache:masterfrom
HyukjinKwon:SPARK-57657

Conversation

@HyukjinKwon

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

In SparkConnectListenerBusListener.send, retry responseObserver.onNext a small bounded number of
times before tearing the listener down, instead of removing it on the first exception. Also add
diagnostic context to the ClientStreamingQuerySuite."listener events" assertions and make the test
listener's fields @volatile.

Why are the changes needed?

ClientStreamingQuerySuite."listener events" is flaky: the QueryStarted/QueryProgress events
arrive but the terminal QueryTerminatedEvent is never received even though the query has stopped.
The server-side listener removes itself and stops sending all further events on the first
onNext failure, so a single transient gRPC hiccup on a frequent progress event silently drops the
later terminate event. A bounded retry keeps the listener alive across transient failures while still
cleaning up when the client is genuinely unresponsive.

The connect server runs in a separate process whose logs are not captured in CI, so the exact failure
is inferred; the added assertion diagnostics (diag(stage)) surface the client-side state if this
test ever flakes again in a scheduled job, to confirm/refine the root cause.

Before (failing in apache/spark CI): listener events 90s timeout, terminate empty —
https://github.com/apache/spark/actions/runs/28004202389/job/82884598238

After (this change, validated on a fork): full connect module green and
ClientStreamingQuerySuite."listener events" re-run 8x with 0 failures —
https://github.com/HyukjinKwon/spark/actions/runs/28074772169

Does this PR introduce any user-facing change?

No. Server hardening + test diagnostics only.

How was this patch tested?

Re-ran the full connect module and ClientStreamingQuerySuite."listener events" 8x on CI (link
above); all green. Existing SparkConnectListenerBusListenerSuite onNext-throw tests still pass.

Was this patch authored or co-authored using generative AI tooling?

Yes, drafted with Claude Code.

@HyukjinKwon HyukjinKwon left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Automated review (Claude Code, /spark-dev:review) — posted as a comment, not an approval.

Verdict: reasonable hardening; a couple of points worth weighing before merge. No hard blockers, but see the dispatch-thread note.

Correctness / design discussion

  1. send() runs on Spark's async StreamingQueryListenerBus dispatch thread, and the retry now does Thread.sleep(200ms) up to twice on it. That thread is shared across all queries/listeners in the session, so a slow or genuinely-dead client could add up to ~400ms of head-of-line latency to event delivery for other queries before the listener is finally torn down. Worth deciding whether that tradeoff is acceptable vs. e.g. retrying without sleeping, or moving teardown off the dispatch path. (The previous code failed fast on the first exception, so this is a behavior change for the unresponsive-client case.)
  2. Root cause is inferred, not confirmed — the connect server logs aren't captured in CI, so I could not prove a transient onNext failure is what drops the terminate event. The retry is the most likely fix, and the added diag(stage) assertion context + @volatile fields are deliberately there to capture the real state if it recurs in a scheduled job. Flagging so reviewers don't read this as a confirmed-root-cause fix.

Nits (non-blocking)

  • maxSendAttempts = 3 / sendRetryBackoffMs = 200 are magic numbers; fine as-is, but a brief "why these values" note (or config) would help future tuning.
  • Interrupt handling (Thread.currentThread().interrupt(); attempt = maxSendAttempts) correctly stops retrying and falls through to cleanup — good.

Tests

  • The existing SparkConnectListenerBusListenerSuite onNext-throw tests still pass because they throw on every onNext, so retries exhaust and cleanup still happens within their 5s eventually. Confirmed green on the fork run.

Validated: full connect module + "listener events" 8× green on a fork (link in PR description).

…sient send failure

### What changes were proposed in this pull request?

In `SparkConnectListenerBusListener.send`, retry `responseObserver.onNext` a small bounded number
of times before tearing the listener down, instead of removing it on the first exception. Also add
diagnostic context to the `ClientStreamingQuerySuite."listener events"` assertions and make the test
listener's fields `@volatile`.

### Why are the changes needed?

`ClientStreamingQuerySuite."listener events"` is flaky (e.g. Java 21 connect): the
`QueryStarted`/`QueryProgress` events arrive but the terminal `QueryTerminatedEvent` is never
received even though the query has stopped. The server-side listener removes itself and stops
sending all further events on the first `onNext` failure, so a single transient gRPC hiccup on a
frequent progress event silently drops the later terminate event. A bounded retry keeps the listener
alive across transient failures while still cleaning up when the client is genuinely unresponsive.

The connect server runs in a separate process whose logs are not captured in CI, so the exact
failure is inferred; the added assertion diagnostics surface the client-side state if this test ever
flakes again in a scheduled job, to confirm/refine the root cause.

### Does this PR introduce any user-facing change?

No. Server hardening + test diagnostics only.

### How was this patch tested?

Re-ran the full connect module and `ClientStreamingQuerySuite."listener events"` 8x on CI; all green.
Existing `SparkConnectListenerBusListenerSuite` onNext-throw tests still pass.

### Was this patch authored or co-authored using generative AI tooling?

Yes, drafted with Claude Code.

Co-authored-by: Isaac
@HyukjinKwon

Copy link
Copy Markdown
Member Author

Minimized the main (server-side) code change: instead of rewriting send() into a while-loop and switching its logging to plain string interpolation, this keeps the original try/catch and structured log"…" + MDC(...) logging untouched and only wraps onNext in a small bounded-retry helper that falls through to the existing cleanup path on the final failure. Production diff is now ~+20/−4 (was +73/−37); behavior is unchanged (3 attempts, 200ms backoff). Test diagnostics (@volatile + diag(stage)) are unchanged. Force-pushed the single commit.

Note: this version lets an InterruptedException during the backoff sleep propagate (vs. explicitly restoring the interrupt flag); happy to add a one-line guard if preferred.

@HyukjinKwon HyukjinKwon marked this pull request as draft June 24, 2026 10:31
@HyukjinKwon HyukjinKwon changed the title [SPARK-57657][CONNECT] Don't drop streaming listener events on a transient send failure [DO-NOT-MERGE][CONNECT] Don't drop streaming listener events on a transient send failure Jun 24, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant