Skip to content

[SPARK-56773][CORE][TESTS] Add more fetch-failure injection knobs for INJECT_SHUFFLE_FETCH_FAILURES#55738

Open
juliuszsompolski wants to merge 1 commit intoapache:masterfrom
juliuszsompolski:retry-injection-infra
Open

[SPARK-56773][CORE][TESTS] Add more fetch-failure injection knobs for INJECT_SHUFFLE_FETCH_FAILURES#55738
juliuszsompolski wants to merge 1 commit intoapache:masterfrom
juliuszsompolski:retry-injection-infra

Conversation

@juliuszsompolski
Copy link
Copy Markdown
Contributor

@juliuszsompolski juliuszsompolski commented May 7, 2026

What changes were proposed in this pull request?

Extends the test-only fetch-failure injection in DAGScheduler with three new knobs and changes the semantics of the existing master switch. Four flags total (all in org.apache.spark.internal.config.Tests):

  • INJECT_SHUFFLE_FETCH_FAILURES (existing). Semantics changed: previously corrupted every map task of stage attempt 0 (so only leaf shuffle map stages were ever affected, since non-leaf stages typically fail-fetch on attempt 0). Now corrupts the partition-0 task of the first SUCCESSFUL attempt of every shuffle map stage, including non-leaf stages whose attempt 0 fails on fetch from upstream.

  • INJECT_SHUFFLE_FETCH_FAILURES_DOWNSTREAM_DELAY (int, default 1, new). Defers the producer's mapper-0 corruption until N task-success events have arrived from ShuffleMapStage consumers of the shuffle. The DAGScheduler event loop processes task-completion events serially, so this guarantees N consumer tasks fully completed BEFORE the FetchFailed cascade kicks in. Subsequent tasks dispatched to free slots after the corruption see the invalid MapStatus and FetchFailed. With spark.sql.shuffle.partitions much larger than executor cores, this gives a deterministic "partial first-attempt + recompute" shape. Set to 0 to corrupt inline at registration.

  • INJECT_SHUFFLE_FETCH_FAILURES_RESULT_STAGE_DELAY (int, default 0, new). Counterpart of the above for ResultStage consumers. With the default 0, when a ResultStage is the consumer of a pending corruption it is corrupted before the result tasks dispatch, so the result stage has zero finished tasks when INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE later triggers rollbackSucceedingStages (the rollback path would otherwise abort a partially-finished result stage, since OSS Spark does not support rolling them back). Set to N > 0 to defer until N result-stage tasks have succeeded - the only way to actually exercise the result-stage abort path.

  • INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE (boolean, default false, new). After a downstream FetchFailed forces the producer's partition 0 to be recomputed, the recomputed task's MapStatus registration is artificially flagged as a checksum mismatch. The DAGScheduler then runs rollbackSucceedingStages, which clears the downstream ShuffleMapStage's outputs and forces a full retry of that stage. ResultStage downstreams are aborted (OSS Spark does not support rolling them back, SPARK-25342).

Why are the changes needed?

The existing INJECT_SHUFFLE_FETCH_FAILURES flag corrupts attempt 0 of every shuffle map stage, but only leaf shuffle map stages succeed on attempt 0. Non-leaf stages fail-fetch from corrupted upstream on attempt 0 and are never themselves corrupted, so unit tests cannot exercise the full range of stage-retry shapes that production hits (metric stability under non-determinism, rollback for indeterminate producers, SLAM semantics across retries). The new knobs let tests deterministically reach those shapes.

Does this PR introduce any user-facing change?

No. All flags are test-only (gated by Utils.isTesting) and are under spark.testing.*.

How was this patch tested?

  • MetricsFailureInjectionSuite (12 tests). Existing tests continue to pass with the new default semantics. The non-deterministic-stage test sees stage-2 raw metric overcount under the new default (because checksum-mismatch rollback now fires on the non-determinism); SLAM remains stable, which is the point of that test.

  • New test Three stage metrics force-checksum-mismatch with delayed corruption: with shuffle.partitions=20 (much greater than the test's local[2] cores) and delay=1, the rollback re-plays at least one already-completed stage-2 partition on top of the full re-run, putting the raw metric strictly above the recompute-only baseline.

  • New test Force checksum mismatch aborts a downstream ResultStage: 2-stage groupBy().count() query where stage 2 is a ResultStage. With RESULT_STAGE_DELAY=1 (opted-in) and INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE, one result task succeeds before the FetchFailed cascade; the forced checksum mismatch on stage 1's mapper-0 recompute then fires rollbackSucceedingStages, which sees numMissingPartitions < numTasks on the result stage and aborts. The query throws a SparkException with "indeterminate" in the message.

  • SQLLastAttemptMetricPlanShapesSuite (220 tests, was previously parameterised on stageRetries: Boolean, now on a tri-valued FailureMode: NoFailure, FetchFailure, ChecksumMismatch). Each plan shape is now also exercised under forced checksum-mismatch rollback; SLAM values remain stable.

  • SQLLastAttemptMetricIntegrationSuiteWithChecksumMismatch (new subclass): runs the full integration suite with INJECT_SHUFFLE_FETCH_FAILURES + INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE enabled.

  • core/scalastyle, sql/scalastyle, and 149 DAGSchedulerSuite tests pass.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code, Opus 4.7.

@juliuszsompolski
Copy link
Copy Markdown
Contributor Author

@cloud-fan I added some refinements to the stage retry testing I added with #55371. This makes it exercise a bigger variety of scenarios. I also want to use it for more correctness testing in presence of retries and recomputes.

…ing stage retries and rollback

Extends the test-only INJECT_SHUFFLE_FETCH_FAILURES injection in the
DAGScheduler with two orthogonal knobs that together let unit tests
force the full range of stage-retry shapes that ordinary fetch-failure
injection cannot reach:

- INJECT_SHUFFLE_FETCH_FAILURES (existing master switch). Semantics
  extended: it now corrupts the partition-0 task of the FIRST
  SUCCESSFUL attempt of every shuffle map stage, including non-leaf
  stages (whose attempt 0 typically fails on fetch from upstream and
  is never corrupted under the previous semantics).

- INJECT_SHUFFLE_FETCH_FAILURES_DOWNSTREAM_DELAY (int, default 1).
  Defers the producer's mapper-0 corruption until N task-success
  events have arrived from stages that consume the shuffle. Because
  the DAGScheduler event loop processes task-completion events
  serially, this guarantees that N consumer tasks fully completed
  BEFORE the FetchFailed cascade kicks in -- the realistic
  "lost shuffle on recompute" shape rather than corrupting at
  producer registration time. Subsequent consumer tasks dispatched to
  free slots after the corruption see the invalid MapStatus and
  FetchFailed. With shuffle.partitions much larger than executor
  cores, this gives a deterministic "partial first-attempt + recompute"
  shape. Set to 0 to corrupt inline at registration.

- INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE (boolean,
  default false). After a downstream FetchFailed forces the
  producer's partition 0 to be recomputed, the recomputed task's
  MapStatus registration is artificially flagged as a checksum
  mismatch. The DAGScheduler then runs rollbackSucceedingStages,
  which clears the downstream ShuffleMapStage's outputs and forces a
  full retry of that stage (every previously-finished partition runs
  again). Mirrors the production path where a recomputed shuffle
  output has a different checksum from a non-deterministic producer.
  ResultStage downstreams still abort because OSS Spark does not
  support rolling them back (SPARK-25342).

Adds tests in MetricsFailureInjectionSuite covering the recompute
injection (with and without rollback) and the delayed-corruption
timing guarantee.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant