[DO-NOT-MERGE] Stabilize failing/flaky CI jobs#56714
Draft
HyukjinKwon wants to merge 10 commits into
Draft
Conversation
…of-bounds contract UTF8String.getByte(int) javadoc states it returns 0 for an invalid byte index, but the implementation performed an unchecked Platform.getByte read, returning adjacent/uninitialized memory for out-of-range indices. Under JDK 25 this surfaced as UTF8StringSuite.testGetByte failing with 'expected 0 but got 47' in the Maven (Scala 2.13, JDK 25) build. Add the bounds check so the method matches its contract deterministically across JDKs. Co-authored-by: Isaac
…g batch 0'
The test 'flow progress events have correct python source code location' defined
two flows writing to the same streaming table 'table1' (its implicit flow plus the
'standalone_flow1' append flow). Both run concurrently and share a single
FileStreamSink '_spark_metadata' commit log, so they race on the batch-0 commit
in ManifestFileCommitProtocol ('Race while writing batch 0'), making the test flaky.
Point 'standalone_flow1' at its own dedicated streaming table 'st2' so each flow
writes to a distinct sink. The append-flow source-code-location assertions are
unchanged (the added create_streaming_table call is placed below them so line
numbers do not shift).
…l.out.java21 golden SPARK-57575 added two TIME-type to_char/to_varchar queries to datetime-formatting.sql and regenerated datetime-formatting.sql.out and datetime-formatting-legacy.sql.out, but not the Java 21 variant datetime-formatting.sql.out.java21. This caused SQLQueryTestSuite to fail on the Maven (Scala 2.13, JDK 21) build: Expected 109, but got 103 blocks in result file 'datetime-formatting.sql.out.java21'. Try regenerating the result files. Append the two new TIME-type query blocks (HH:mm:ss formatting is not locale/JDK sensitive, so outputs match the default golden) so the JDK 21 golden has 36 query blocks like the default. Co-authored-by: Isaac
…ests
### What changes were proposed in this pull request?
Make the two `SparkSessionE2ESuite` "interrupt all" tests robust against two
flakiness sources:
1. Run each long-running typed `map` query through a single call site and warm it
up once (sleep=0) before any interrupt. The first execution of a typed `map`
ships the closure and its `TypeTag` artifact classes and the executor fetches
them on demand; when an `interruptAll()` lands during that first-time remote
class fetch, it surfaces as `RemoteClassLoaderError`
(`...SparkSessionE2ESuite$$typecreatorNN$1.class`) instead of
`OPERATION_CANCELED`, failing the assertion. Warming up loads the classes on the
executor so the interrupted run no longer races a class fetch.
2. Wrap the foreground-interrupt test body in `try/finally { finished = true }` so
that if an assertion fails, the background `interruptor` Future stops instead of
continuing to call `interruptAll()` for up to 20s and canceling the operations of
subsequent tests in the suite (which previously cascaded into many
`OPERATION_CANCELED` failures across the whole suite).
### Why are the changes needed?
`SparkSessionE2ESuite` intermittently fails in CI (master push Build and Maven
JDK 21/25): one `RemoteClassLoaderError` in the first interrupt test cascaded into
~7 failures in the suite.
### Does this PR introduce any user-facing change?
No, test-only.
### How was this patch tested?
Re-ran `SparkSessionE2ESuite` repeatedly in CI.
Co-authored-by: Isaac
…t in MetricsFailureInjectionSuite The test 'Force checksum mismatch aborts a downstream ResultStage' was flaky under Maven (failed ~3/10 scheduled runs; passes on SBT). It grouped by the 5-value 'low_cardinality_col', so only the handful of reducer partitions holding mapper-0's keys read the corrupted mapper. The mapper-0 corruption is applied asynchronously after the first result task succeeds (RESULT_STAGE_DELAY=1); whether the abort fires then depended on whether one of those few partitions was scheduled after the corruption -- a race. Group by the high-cardinality 'id' column instead so every one of the 20 reducer partitions reads mapper-0. With local[2], the remaining result tasks are dispatched only after the first completes (i.e. after the corruption), so at least one always hits the corrupted mapper and the indeterminate-stage abort fires deterministically. Co-authored-by: Isaac
…ate test The 'snapshotStartBatchId with transformWithState' test in StateDataSourceTransformWithStateSuite relied on a fixed Thread.sleep(5000) to give the asynchronous maintenance thread time to upload RocksDB snapshot files before reading state with the snapshotStartBatchId option. Under CI load (observed on Maven Scala 2.13 JDK 21/25) the sleep is sometimes too short, so the snapshot '2.zip' is not yet uploaded when the reader runs: CANNOT_LOAD_STATE_STORE.UNCATEGORIZED ... Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist Replace the fixed sleep with a deterministic eventually() wait that polls until the snapshot (version 2) files for the partitions read by the test (1 and 4) have actually been uploaded. The regex matches both checkpoint v1 (2.zip) and v2 (2_<uniqueId>.zip) naming, covering all suite subclasses. Co-authored-by: Isaac
…race When two writers race on the same FileStreamSink _spark_metadata log, commitJob threw a bare 'Race while writing batch N'. Enrich it: log the sink metadata path and batchId at ERROR and include them (plus the likely cause — concurrent streaming queries writing to the same output path) in the exception message, so a recurrence in scheduled jobs is diagnosable from logs without re-reproducing.
…nup RPC hang A rare CI hang (e.g. pyspark.ml.tests.connect.test_parity_clustering timing out at 450s) traces to a re-entrant ML-cache RPC: while a best-effort cleanup/delete RPC (_cleanup_ml_cache / _delete_ml_cache) is blocked in gRPC with the GIL released, CPython runs a pending RemoteModelRef finalizer (__del__ -> del_remote_cache -> _delete_ml_cache) on the same thread, issuing a second blocking RPC that deadlocks the channel until the process/test timeout. Add a same-thread re-entrancy guard around both ML-cache RPCs. The nested call is redundant (the in-flight RPC is already releasing server state, which is also evicted on session end), so it is skipped and a WARNING is logged -- turning a silent multi-minute hang into a fast, observable signal that pinpoints the cause if it recurs in scheduled jobs. This is a no-regression safety net: in normal operation no ML-cache RPC is in flight when another is issued, so behavior is unchanged. Co-authored-by: Isaac
…snapshot wait When the deterministic snapshot-upload wait times out, surface the actual contents of the partition state directory so a recurrence in scheduled jobs is immediately diagnosable (snapshot still pending vs. cleaned up vs. wrong dir) instead of a bare timeout. Verified deflake holds across 10 consecutive runs (fork CI run 28074724227, 10/10 passed). Co-authored-by: Isaac
…t for tws deflake Supersedes the earlier end-of-stream wait, which could still time out on the avro variant (maintenance only snapshots the current version). Validated both encodings x8 (run 28081816565). Matches standalone PR apache#56721. Co-authored-by: Isaac
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Scope
Stabilize failing/flaky CI jobs (excluding the Pandas 3 jobs the community is
already handling). This is a shared integration branch; commits are focused and
reference the job/area they fix.
Fixes included so far
UTF8String.getByteout-of-bounds contract —getByte(int)documents"if byte index is invalid, returns 0" but did an unchecked
Platform.getByte,returning adjacent memory. Surfaced as
UTF8StringSuite.testGetByteexpected 0 but got 47 in Maven (Scala 2.13, JDK 25). Added the bounds
check so behavior is deterministic across JDKs.
Tracking (other failing lanes, handled separately / in progress)
datetime-formatting.sqlstale.out.java21golden file (Maven JDK21 & JDK25, JDK21 SBT) — regenerate JDK21 golden after SPARK-57575SparkSessionE2ESuiteinterrupt testsRemoteClassLoaderErrorflakinessRace while writing batch 0flakinessMetricsFailureInjectionSuitetiming flakinessThis pull request and its description were written by Isaac.