[SPARK-56773][CORE][TESTS] Add more fetch-failure injection knobs for INJECT_SHUFFLE_FETCH_FAILURES#55738
Open
juliuszsompolski wants to merge 1 commit intoapache:masterfrom
Open
Conversation
Contributor
Author
|
@cloud-fan I added some refinements to the stage retry testing I added with #55371. This makes it exercise a bigger variety of scenarios. I also want to use it for more correctness testing in presence of retries and recomputes. |
…ing stage retries and rollback Extends the test-only INJECT_SHUFFLE_FETCH_FAILURES injection in the DAGScheduler with two orthogonal knobs that together let unit tests force the full range of stage-retry shapes that ordinary fetch-failure injection cannot reach: - INJECT_SHUFFLE_FETCH_FAILURES (existing master switch). Semantics extended: it now corrupts the partition-0 task of the FIRST SUCCESSFUL attempt of every shuffle map stage, including non-leaf stages (whose attempt 0 typically fails on fetch from upstream and is never corrupted under the previous semantics). - INJECT_SHUFFLE_FETCH_FAILURES_DOWNSTREAM_DELAY (int, default 1). Defers the producer's mapper-0 corruption until N task-success events have arrived from stages that consume the shuffle. Because the DAGScheduler event loop processes task-completion events serially, this guarantees that N consumer tasks fully completed BEFORE the FetchFailed cascade kicks in -- the realistic "lost shuffle on recompute" shape rather than corrupting at producer registration time. Subsequent consumer tasks dispatched to free slots after the corruption see the invalid MapStatus and FetchFailed. With shuffle.partitions much larger than executor cores, this gives a deterministic "partial first-attempt + recompute" shape. Set to 0 to corrupt inline at registration. - INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE (boolean, default false). After a downstream FetchFailed forces the producer's partition 0 to be recomputed, the recomputed task's MapStatus registration is artificially flagged as a checksum mismatch. The DAGScheduler then runs rollbackSucceedingStages, which clears the downstream ShuffleMapStage's outputs and forces a full retry of that stage (every previously-finished partition runs again). Mirrors the production path where a recomputed shuffle output has a different checksum from a non-deterministic producer. ResultStage downstreams still abort because OSS Spark does not support rolling them back (SPARK-25342). Adds tests in MetricsFailureInjectionSuite covering the recompute injection (with and without rollback) and the delayed-corruption timing guarantee. Co-authored-by: Isaac
16fb596 to
eeaa228
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Extends the test-only fetch-failure injection in
DAGSchedulerwith three new knobs and changes the semantics of the existing master switch. Four flags total (all inorg.apache.spark.internal.config.Tests):INJECT_SHUFFLE_FETCH_FAILURES(existing). Semantics changed: previously corrupted every map task of stage attempt 0 (so only leaf shuffle map stages were ever affected, since non-leaf stages typically fail-fetch on attempt 0). Now corrupts the partition-0 task of the first SUCCESSFUL attempt of every shuffle map stage, including non-leaf stages whose attempt 0 fails on fetch from upstream.INJECT_SHUFFLE_FETCH_FAILURES_DOWNSTREAM_DELAY(int, default1, new). Defers the producer's mapper-0 corruption until N task-success events have arrived fromShuffleMapStageconsumers of the shuffle. The DAGScheduler event loop processes task-completion events serially, so this guarantees N consumer tasks fully completed BEFORE the FetchFailed cascade kicks in. Subsequent tasks dispatched to free slots after the corruption see the invalidMapStatusandFetchFailed. Withspark.sql.shuffle.partitionsmuch larger than executor cores, this gives a deterministic "partial first-attempt + recompute" shape. Set to0to corrupt inline at registration.INJECT_SHUFFLE_FETCH_FAILURES_RESULT_STAGE_DELAY(int, default0, new). Counterpart of the above forResultStageconsumers. With the default0, when aResultStageis the consumer of a pending corruption it is corrupted before the result tasks dispatch, so the result stage has zero finished tasks whenINJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTElater triggersrollbackSucceedingStages(the rollback path would otherwise abort a partially-finished result stage, since OSS Spark does not support rolling them back). Set to N > 0 to defer until N result-stage tasks have succeeded - the only way to actually exercise the result-stage abort path.INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE(boolean, default false, new). After a downstreamFetchFailedforces the producer's partition 0 to be recomputed, the recomputed task'sMapStatusregistration is artificially flagged as a checksum mismatch. The DAGScheduler then runsrollbackSucceedingStages, which clears the downstreamShuffleMapStage's outputs and forces a full retry of that stage.ResultStagedownstreams are aborted (OSS Spark does not support rolling them back, SPARK-25342).Why are the changes needed?
The existing
INJECT_SHUFFLE_FETCH_FAILURESflag corrupts attempt 0 of every shuffle map stage, but only leaf shuffle map stages succeed on attempt 0. Non-leaf stages fail-fetch from corrupted upstream on attempt 0 and are never themselves corrupted, so unit tests cannot exercise the full range of stage-retry shapes that production hits (metric stability under non-determinism, rollback for indeterminate producers, SLAM semantics across retries). The new knobs let tests deterministically reach those shapes.Does this PR introduce any user-facing change?
No. All flags are test-only (gated by
Utils.isTesting) and are underspark.testing.*.How was this patch tested?
MetricsFailureInjectionSuite(12 tests). Existing tests continue to pass with the new default semantics. The non-deterministic-stage test sees stage-2 raw metric overcount under the new default (because checksum-mismatch rollback now fires on the non-determinism); SLAM remains stable, which is the point of that test.New test
Three stage metrics force-checksum-mismatch with delayed corruption: withshuffle.partitions=20(much greater than the test'slocal[2]cores) anddelay=1, the rollback re-plays at least one already-completed stage-2 partition on top of the full re-run, putting the raw metric strictly above the recompute-only baseline.New test
Force checksum mismatch aborts a downstream ResultStage: 2-stagegroupBy().count()query where stage 2 is aResultStage. WithRESULT_STAGE_DELAY=1(opted-in) andINJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTE, one result task succeeds before the FetchFailed cascade; the forced checksum mismatch on stage 1's mapper-0 recompute then firesrollbackSucceedingStages, which seesnumMissingPartitions < numTaskson the result stage and aborts. The query throws aSparkExceptionwith"indeterminate"in the message.SQLLastAttemptMetricPlanShapesSuite(220 tests, was previously parameterised onstageRetries: Boolean, now on a tri-valuedFailureMode:NoFailure,FetchFailure,ChecksumMismatch). Each plan shape is now also exercised under forced checksum-mismatch rollback; SLAM values remain stable.SQLLastAttemptMetricIntegrationSuiteWithChecksumMismatch(new subclass): runs the full integration suite withINJECT_SHUFFLE_FETCH_FAILURES + INJECT_SHUFFLE_FORCE_CHECKSUM_MISMATCH_ON_RECOMPUTEenabled.core/scalastyle,sql/scalastyle, and 149DAGSchedulerSuitetests pass.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code, Opus 4.7.