[DO-NOT-MERGE][CONNECT] Stabilize flaky SparkSessionE2ESuite + ClientStreamingQuerySuite listener events#56716
Closed
HyukjinKwon wants to merge 4 commits into
Closed
Conversation
…ests
### What changes were proposed in this pull request?
Make the two `SparkSessionE2ESuite` "interrupt all" tests robust against two
flakiness sources:
1. Run each long-running typed `map` query through a single call site and warm it
up once (sleep=0) before any interrupt. The first execution of a typed `map`
ships the closure and its `TypeTag` artifact classes and the executor fetches
them on demand; when an `interruptAll()` lands during that first-time remote
class fetch, it surfaces as `RemoteClassLoaderError`
(`...SparkSessionE2ESuite$$typecreatorNN$1.class`) instead of
`OPERATION_CANCELED`, failing the assertion. Warming up loads the classes on the
executor so the interrupted run no longer races a class fetch.
2. Wrap the foreground-interrupt test body in `try/finally { finished = true }` so
that if an assertion fails, the background `interruptor` Future stops instead of
continuing to call `interruptAll()` for up to 20s and canceling the operations of
subsequent tests in the suite (which previously cascaded into many
`OPERATION_CANCELED` failures across the whole suite).
### Why are the changes needed?
`SparkSessionE2ESuite` intermittently fails in CI (master push Build and Maven
JDK 21/25): one `RemoteClassLoaderError` in the first interrupt test cascaded into
~7 failures in the suite.
### Does this PR introduce any user-facing change?
No, test-only.
### How was this patch tested?
Re-ran `SparkSessionE2ESuite` repeatedly in CI.
Co-authored-by: Isaac
Adds a focused workflow that builds the connect module once and re-runs only SparkSessionE2ESuite several times to confirm it is no longer flaky, and skips the full scala build matrix for this fork branch so the draft's signal stays focused. Revert both before merging the real fix. Co-authored-by: Isaac
…ilure ### What changes were proposed in this pull request? In `SparkConnectListenerBusListener.send`, retry the `responseObserver.onNext` a small bounded number of times before tearing the listener down, instead of removing it on the first exception. Also add diagnostic context to the `ClientStreamingQuerySuite."listener events"` assertions and make the test listener's fields `@volatile`. ### Why are the changes needed? `ClientStreamingQuerySuite."listener events"` is flaky (e.g. Java21 connect run 28004202389): the `QueryStartedEvent`/`QueryProgressEvent` arrive but the terminal `QueryTerminatedEvent` is never received (`listener.terminate` empty after 60s) even though the query has stopped. The server-side listener removes itself and stops sending **all** further events on the first `onNext` failure, so a single transient gRPC hiccup on a (frequent) progress event silently drops the later terminate event. A bounded retry keeps the listener alive across transient failures while still cleaning up when the client is genuinely unresponsive. Note: the server runs in a separate process whose logs are not captured in CI, so the exact failure is inferred. The added assertion diagnostics (`diag(stage)`) surface the client-side state if this test ever flakes again in a scheduled job, to confirm/refine the root cause. ### Does this PR introduce any user-facing change? No. Server hardening + test diagnostics only. ### How was this patch tested? Re-ran the full connect module and `ClientStreamingQuerySuite."listener events"` repeatedly in CI. Existing `SparkConnectListenerBusListenerSuite` onNext-throw tests still pass (the listener is still cleaned up once sending keeps failing). Co-authored-by: Isaac
…listener events Co-authored-by: Isaac
Member
Author
|
Superseded by the two clean, non-draft PRs (scaffolding dropped, JIRAs filed):
This draft was only used to validate the fixes on a fork (build connect once + re-run the affected suites 8x). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This draft bundles two independent Spark Connect CI flakiness fixes.
Fix 1 —
SparkSessionE2ESuiteinterrupt-all tests (test-only)The foreground typed-
.mapships its closure/TypeTagclasses as Connect artifacts; the backgroundinterruptAll()can land during the executor's first-time remote fetch of$$typecreatorNN,surfacing as
RemoteClassLoaderErrorinstead ofOPERATION_CANCELED. When that assertion failed,the
interruptorFuture was never stopped and kept canceling subsequent tests' operations →cascade of
OPERATION_CANCELEDfailures across the suite.already loaded on the executor.
try/finally { finished = true }so a failure can't leak the interruptor.Fix 2 —
ClientStreamingQuerySuite."listener events"lost terminate eventQueryStartedEvent/QueryProgressEventarrive but the terminalQueryTerminatedEventis neverreceived (
listener.terminateempty after 60s) though the query has stopped. The server-sideSparkConnectListenerBusListener.sendremoves the listener and stops sending all further eventson the first
onNextfailure, so a single transient gRPC hiccup on a frequent progress eventsilently drops the later terminate event.
onNexta small bounded number of times before tearing the listener down.diag(stage)context to the test assertions and make the test listener fields@volatile, soa future scheduled-job recurrence is debuggable (server logs aren't captured in CI, so the exact
failure is inferred — these diagnostics confirm/refine it next time).
Does this PR introduce any user-facing change?
No (test-only + server hardening).
How was this patch tested?
Focused fork workflow: rebuild connect, then re-run
SparkSessionE2ESuite×8 andClientStreamingQuerySuite."listener events"×8. ExistingSparkConnectListenerBusListenerSuiteonNext-throw tests still pass.
Was this patch authored or co-authored using generative AI tooling?
Yes, drafted with Claude Code.