Skip to content

[DO-NOT-MERGE] Stabilize failing/flaky CI jobs#56714

Draft
HyukjinKwon wants to merge 10 commits into
apache:masterfrom
HyukjinKwon:ci-green-2026-06
Draft

[DO-NOT-MERGE] Stabilize failing/flaky CI jobs#56714
HyukjinKwon wants to merge 10 commits into
apache:masterfrom
HyukjinKwon:ci-green-2026-06

Conversation

@HyukjinKwon

Copy link
Copy Markdown
Member

[DO-NOT-MERGE] Verification-only PR. Do not merge. Opened as a draft to
run CI on a batch of fixes that stabilize currently failing / flaky
apache/spark master scheduled jobs. Individual fixes will be sent as
separate, properly-attributed PRs.

Scope

Stabilize failing/flaky CI jobs (excluding the Pandas 3 jobs the community is
already handling). This is a shared integration branch; commits are focused and
reference the job/area they fix.

Fixes included so far

  • UTF8String.getByte out-of-bounds contractgetByte(int) documents
    "if byte index is invalid, returns 0" but did an unchecked Platform.getByte,
    returning adjacent memory. Surfaced as UTF8StringSuite.testGetByte
    expected 0 but got 47 in Maven (Scala 2.13, JDK 25). Added the bounds
    check so behavior is deterministic across JDKs.

Tracking (other failing lanes, handled separately / in progress)

  • datetime-formatting.sql stale .out.java21 golden file (Maven JDK21 & JDK25, JDK21 SBT) — regenerate JDK21 golden after SPARK-57575
  • spark-connect SparkSessionE2ESuite interrupt tests RemoteClassLoaderError flakiness
  • pyspark connect pipelines Race while writing batch 0 flakiness
  • MetricsFailureInjectionSuite timing flakiness
  • branch-4.2 Maven/Build JDK21/JDK25 failures

This pull request and its description were written by Isaac.

HyukjinKwon and others added 10 commits June 24, 2026 09:08
…of-bounds contract

UTF8String.getByte(int) javadoc states it returns 0 for an invalid byte
index, but the implementation performed an unchecked Platform.getByte read,
returning adjacent/uninitialized memory for out-of-range indices. Under JDK
25 this surfaced as UTF8StringSuite.testGetByte failing with 'expected 0 but
got 47' in the Maven (Scala 2.13, JDK 25) build. Add the bounds check so the
method matches its contract deterministically across JDKs.

Co-authored-by: Isaac
…g batch 0'

The test 'flow progress events have correct python source code location' defined
two flows writing to the same streaming table 'table1' (its implicit flow plus the
'standalone_flow1' append flow). Both run concurrently and share a single
FileStreamSink '_spark_metadata' commit log, so they race on the batch-0 commit
in ManifestFileCommitProtocol ('Race while writing batch 0'), making the test flaky.

Point 'standalone_flow1' at its own dedicated streaming table 'st2' so each flow
writes to a distinct sink. The append-flow source-code-location assertions are
unchanged (the added create_streaming_table call is placed below them so line
numbers do not shift).
…l.out.java21 golden

SPARK-57575 added two TIME-type to_char/to_varchar queries to
datetime-formatting.sql and regenerated datetime-formatting.sql.out and
datetime-formatting-legacy.sql.out, but not the Java 21 variant
datetime-formatting.sql.out.java21. This caused SQLQueryTestSuite to fail
on the Maven (Scala 2.13, JDK 21) build:

  Expected 109, but got 103 blocks in result file
  'datetime-formatting.sql.out.java21'. Try regenerating the result files.

Append the two new TIME-type query blocks (HH:mm:ss formatting is not
locale/JDK sensitive, so outputs match the default golden) so the JDK 21
golden has 36 query blocks like the default.

Co-authored-by: Isaac
…ests

### What changes were proposed in this pull request?

Make the two `SparkSessionE2ESuite` "interrupt all" tests robust against two
flakiness sources:

1. Run each long-running typed `map` query through a single call site and warm it
   up once (sleep=0) before any interrupt. The first execution of a typed `map`
   ships the closure and its `TypeTag` artifact classes and the executor fetches
   them on demand; when an `interruptAll()` lands during that first-time remote
   class fetch, it surfaces as `RemoteClassLoaderError`
   (`...SparkSessionE2ESuite$$typecreatorNN$1.class`) instead of
   `OPERATION_CANCELED`, failing the assertion. Warming up loads the classes on the
   executor so the interrupted run no longer races a class fetch.

2. Wrap the foreground-interrupt test body in `try/finally { finished = true }` so
   that if an assertion fails, the background `interruptor` Future stops instead of
   continuing to call `interruptAll()` for up to 20s and canceling the operations of
   subsequent tests in the suite (which previously cascaded into many
   `OPERATION_CANCELED` failures across the whole suite).

### Why are the changes needed?

`SparkSessionE2ESuite` intermittently fails in CI (master push Build and Maven
JDK 21/25): one `RemoteClassLoaderError` in the first interrupt test cascaded into
~7 failures in the suite.

### Does this PR introduce any user-facing change?

No, test-only.

### How was this patch tested?

Re-ran `SparkSessionE2ESuite` repeatedly in CI.

Co-authored-by: Isaac
…t in MetricsFailureInjectionSuite

The test 'Force checksum mismatch aborts a downstream ResultStage' was flaky under
Maven (failed ~3/10 scheduled runs; passes on SBT). It grouped by the 5-value
'low_cardinality_col', so only the handful of reducer partitions holding mapper-0's
keys read the corrupted mapper. The mapper-0 corruption is applied asynchronously
after the first result task succeeds (RESULT_STAGE_DELAY=1); whether the abort fires
then depended on whether one of those few partitions was scheduled after the
corruption -- a race.

Group by the high-cardinality 'id' column instead so every one of the 20 reducer
partitions reads mapper-0. With local[2], the remaining result tasks are dispatched
only after the first completes (i.e. after the corruption), so at least one always
hits the corrupted mapper and the indeterminate-stage abort fires deterministically.

Co-authored-by: Isaac
…ate test

The 'snapshotStartBatchId with transformWithState' test in
StateDataSourceTransformWithStateSuite relied on a fixed Thread.sleep(5000)
to give the asynchronous maintenance thread time to upload RocksDB snapshot
files before reading state with the snapshotStartBatchId option. Under CI load
(observed on Maven Scala 2.13 JDK 21/25) the sleep is sometimes too short, so
the snapshot '2.zip' is not yet uploaded when the reader runs:

  CANNOT_LOAD_STATE_STORE.UNCATEGORIZED ...
  Caused by: java.io.FileNotFoundException: .../state/0/1/2.zip does not exist

Replace the fixed sleep with a deterministic eventually() wait that polls until
the snapshot (version 2) files for the partitions read by the test (1 and 4)
have actually been uploaded. The regex matches both checkpoint v1 (2.zip) and
v2 (2_<uniqueId>.zip) naming, covering all suite subclasses.

Co-authored-by: Isaac
…race

When two writers race on the same FileStreamSink _spark_metadata log, commitJob
threw a bare 'Race while writing batch N'. Enrich it: log the sink metadata path
and batchId at ERROR and include them (plus the likely cause — concurrent
streaming queries writing to the same output path) in the exception message, so a
recurrence in scheduled jobs is diagnosable from logs without re-reproducing.
…nup RPC hang

A rare CI hang (e.g. pyspark.ml.tests.connect.test_parity_clustering timing out at 450s)
traces to a re-entrant ML-cache RPC: while a best-effort cleanup/delete RPC
(_cleanup_ml_cache / _delete_ml_cache) is blocked in gRPC with the GIL released, CPython
runs a pending RemoteModelRef finalizer (__del__ -> del_remote_cache -> _delete_ml_cache)
on the same thread, issuing a second blocking RPC that deadlocks the channel until the
process/test timeout.

Add a same-thread re-entrancy guard around both ML-cache RPCs. The nested call is redundant
(the in-flight RPC is already releasing server state, which is also evicted on session end),
so it is skipped and a WARNING is logged -- turning a silent multi-minute hang into a fast,
observable signal that pinpoints the cause if it recurs in scheduled jobs.

This is a no-regression safety net: in normal operation no ML-cache RPC is in flight when
another is issued, so behavior is unchanged.

Co-authored-by: Isaac
…snapshot wait

When the deterministic snapshot-upload wait times out, surface the actual contents of the
partition state directory so a recurrence in scheduled jobs is immediately diagnosable
(snapshot still pending vs. cleaned up vs. wrong dir) instead of a bare timeout.

Verified deflake holds across 10 consecutive runs (fork CI run 28074724227, 10/10 passed).

Co-authored-by: Isaac
…t for tws deflake

Supersedes the earlier end-of-stream wait, which could still time out on the avro variant
(maintenance only snapshots the current version). Validated both encodings x8 (run 28081816565).
Matches standalone PR apache#56721.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant