Skip to content

[DO-NOT-MERGE][CONNECT] Stabilize flaky SparkSessionE2ESuite + ClientStreamingQuerySuite listener events#56716

Closed
HyukjinKwon wants to merge 4 commits into
apache:masterfrom
HyukjinKwon:ci-fix/connect-sparksession-e2e-flaky
Closed

[DO-NOT-MERGE][CONNECT] Stabilize flaky SparkSessionE2ESuite + ClientStreamingQuerySuite listener events#56716
HyukjinKwon wants to merge 4 commits into
apache:masterfrom
HyukjinKwon:ci-fix/connect-sparksession-e2e-flaky

Conversation

@HyukjinKwon

@HyukjinKwon HyukjinKwon commented Jun 24, 2026

Copy link
Copy Markdown
Member

[DO-NOT-MERGE] — draft used to stabilize flaky connect CI tests and validate the fixes
on a fork. The two [DO-NOT-MERGE] commits add temporary CI scaffolding (a focused workflow that
rebuilds the connect module and re-runs only the affected suites several times, and skips the full
scala matrix for this fork branch) and must be dropped before any real merge. The two real
fixes are independent and can be split into separate PRs.

This draft bundles two independent Spark Connect CI flakiness fixes.

Fix 1 — SparkSessionE2ESuite interrupt-all tests (test-only)

The foreground typed-.map ships its closure/TypeTag classes as Connect artifacts; the background
interruptAll() can land during the executor's first-time remote fetch of $$typecreatorNN,
surfacing as RemoteClassLoaderError instead of OPERATION_CANCELED. When that assertion failed,
the interruptor Future was never stopped and kept canceling subsequent tests' operations →
cascade of OPERATION_CANCELED failures across the suite.

  • Warm up the map closure once (via a single call site) before any interrupt, so its classes are
    already loaded on the executor.
  • Wrap the body in try/finally { finished = true } so a failure can't leak the interruptor.

Fix 2 — ClientStreamingQuerySuite."listener events" lost terminate event

QueryStartedEvent/QueryProgressEvent arrive but the terminal QueryTerminatedEvent is never
received (listener.terminate empty after 60s) though the query has stopped. The server-side
SparkConnectListenerBusListener.send removes the listener and stops sending all further events
on the first onNext failure, so a single transient gRPC hiccup on a frequent progress event
silently drops the later terminate event.

  • Retry onNext a small bounded number of times before tearing the listener down.
  • Add diag(stage) context to the test assertions and make the test listener fields @volatile, so
    a future scheduled-job recurrence is debuggable (server logs aren't captured in CI, so the exact
    failure is inferred — these diagnostics confirm/refine it next time).

Does this PR introduce any user-facing change?

No (test-only + server hardening).

How was this patch tested?

Focused fork workflow: rebuild connect, then re-run SparkSessionE2ESuite ×8 and
ClientStreamingQuerySuite."listener events" ×8. Existing SparkConnectListenerBusListenerSuite
onNext-throw tests still pass.

Was this patch authored or co-authored using generative AI tooling?

Yes, drafted with Claude Code.

…ests

### What changes were proposed in this pull request?

Make the two `SparkSessionE2ESuite` "interrupt all" tests robust against two
flakiness sources:

1. Run each long-running typed `map` query through a single call site and warm it
   up once (sleep=0) before any interrupt. The first execution of a typed `map`
   ships the closure and its `TypeTag` artifact classes and the executor fetches
   them on demand; when an `interruptAll()` lands during that first-time remote
   class fetch, it surfaces as `RemoteClassLoaderError`
   (`...SparkSessionE2ESuite$$typecreatorNN$1.class`) instead of
   `OPERATION_CANCELED`, failing the assertion. Warming up loads the classes on the
   executor so the interrupted run no longer races a class fetch.

2. Wrap the foreground-interrupt test body in `try/finally { finished = true }` so
   that if an assertion fails, the background `interruptor` Future stops instead of
   continuing to call `interruptAll()` for up to 20s and canceling the operations of
   subsequent tests in the suite (which previously cascaded into many
   `OPERATION_CANCELED` failures across the whole suite).

### Why are the changes needed?

`SparkSessionE2ESuite` intermittently fails in CI (master push Build and Maven
JDK 21/25): one `RemoteClassLoaderError` in the first interrupt test cascaded into
~7 failures in the suite.

### Does this PR introduce any user-facing change?

No, test-only.

### How was this patch tested?

Re-ran `SparkSessionE2ESuite` repeatedly in CI.

Co-authored-by: Isaac
Adds a focused workflow that builds the connect module once and re-runs only
SparkSessionE2ESuite several times to confirm it is no longer flaky, and skips the
full scala build matrix for this fork branch so the draft's signal stays focused.
Revert both before merging the real fix.

Co-authored-by: Isaac
…ilure

### What changes were proposed in this pull request?

In `SparkConnectListenerBusListener.send`, retry the `responseObserver.onNext` a small bounded
number of times before tearing the listener down, instead of removing it on the first exception.
Also add diagnostic context to the `ClientStreamingQuerySuite."listener events"` assertions and make
the test listener's fields `@volatile`.

### Why are the changes needed?

`ClientStreamingQuerySuite."listener events"` is flaky (e.g. Java21 connect run 28004202389): the
`QueryStartedEvent`/`QueryProgressEvent` arrive but the terminal `QueryTerminatedEvent` is never
received (`listener.terminate` empty after 60s) even though the query has stopped. The server-side
listener removes itself and stops sending **all** further events on the first `onNext` failure, so a
single transient gRPC hiccup on a (frequent) progress event silently drops the later terminate event.
A bounded retry keeps the listener alive across transient failures while still cleaning up when the
client is genuinely unresponsive.

Note: the server runs in a separate process whose logs are not captured in CI, so the exact failure
is inferred. The added assertion diagnostics (`diag(stage)`) surface the client-side state if this
test ever flakes again in a scheduled job, to confirm/refine the root cause.

### Does this PR introduce any user-facing change?

No. Server hardening + test diagnostics only.

### How was this patch tested?

Re-ran the full connect module and `ClientStreamingQuerySuite."listener events"` repeatedly in CI.
Existing `SparkConnectListenerBusListenerSuite` onNext-throw tests still pass (the listener is still
cleaned up once sending keeps failing).

Co-authored-by: Isaac
@HyukjinKwon HyukjinKwon changed the title [DO-NOT-MERGE][CONNECT][TESTS] Stabilize flaky SparkSessionE2ESuite interrupt-all tests [DO-NOT-MERGE][CONNECT] Stabilize flaky SparkSessionE2ESuite + ClientStreamingQuerySuite listener events Jun 24, 2026
@HyukjinKwon

Copy link
Copy Markdown
Member Author

Superseded by the two clean, non-draft PRs (scaffolding dropped, JIRAs filed):

This draft was only used to validate the fixes on a fork (build connect once + re-run the affected suites 8x).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant