Skip to content

[SPARK-56482][SQL][FOLLOWUP] Simplify UnionExec codegen and narrow partition-index gate#55719

Open
cloud-fan wants to merge 2 commits intoapache:masterfrom
cloud-fan:SPARK-56482-followup
Open

[SPARK-56482][SQL][FOLLOWUP] Simplify UnionExec codegen and narrow partition-index gate#55719
cloud-fan wants to merge 2 commits intoapache:masterfrom
cloud-fan:SPARK-56482-followup

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan commented May 6, 2026

What changes were proposed in this pull request?

Followup to SPARK-56482 (#55425). Two groups of changes to UnionExec's whole-stage codegen path.

Code cleanness:

  • Hoist metricTerm("numOutputRows") to doProduce and store it on the instance. doConsume runs once per child during emission, so the previous code registered the same metric N times in references[] for an N-child Union; now once.
  • Drop the dead assert in perChildProjections and the duplicate allChildOutputDataTypesMatch lazy val. The dataType comparison now has a single source of truth in the type-mismatch branch of the gate.
  • Inline the one-shot hasAnyPartitionIndexDependentDescendant lazy val.
  • Drop the unreachable case other in the UnionPartition match and replace with asInstanceOf. unionedInputRDD is built as new UnionRDD(...) two lines up, and getPartitions only ever returns UnionPartition[_].
  • Factor isPlainUnion helper used by the gate and doExecute so the invariant "codegen path matches sparkContext.union semantics" lives in one place.
  • Bind currentPartitionIndexVar to the array-deref expression ((int[]) refs[K])[partitionIndex] directly. An earlier revision hoisted this to a childLocalIdx local at helper entry, but SampleExec.doConsume reads currentPartitionIndexVar from inside an addMutableState initializer, which is emitted into the state-init function — outside the per-child helper — so the local was not in scope and the generated code failed to compile. The expression form resolves in any emission scope (helper parameter or BufferedRowIterator field).
  • Drop the try/finally around codegen state restoration. Codegen failure aborts the whole stage, so the restoration is unreachable.

Gate narrowing:

  • Narrow hasPartitionIndexDependentCodegen to exclude InputFileName, InputFileBlockStart, and InputFileBlockLength. These are Nondeterministic but read from InputFileBlockHolder (a per-task thread-local) and do not embed partitionIndex, so they are safe under fusion. Queries like SELECT input_file_name() FROM a UNION ALL SELECT input_file_name() FROM b now fuse.

Why are the changes needed?

The cleanups remove accidental complexity in the fused code path: an N-fold metric reference, two duplicated dataType comparisons, an unreachable defensive guard, and a try/finally that protects against an unreachable case. The gate narrowing turns a missed optimization (file-scan unions) into a fused plan.

Does this PR introduce any user-facing change?

No. spark.sql.codegen.wholeStage.union.enabled remains off by default; when on, the new behavior fuses additional plans (file-scan unions with input_file_name()) that the previous gate over-rejected.

How was this patch tested?

UnionCodegenSuite, UnionCodegenAnsiSuite, UnionCodegenAqeSuite, and the relevant SQLMetricsSuite test all pass. Three tests added:

  • partitioning-aware union falls back to non-codegen — covers a supportCodegenFailureReason branch that lacked explicit coverage.
  • input_file_name child fuses (Nondeterministic but partition-index-free) — validates the gate narrowing.
  • union with sample children fuses (or falls back) without crashing — regression test for the currentPartitionIndexVar binding (caught by @LuciferYang in review).

The columnar fallback branch is not covered by a new test: reliably constructing a plan where Union.supportsColumnar is true via the user-facing API turned out to be brittle, since ApplyColumnarRulesAndInsertTransitions aggressively rebalances columnar/row transitions.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…rtition-index gate

### What changes were proposed in this pull request?

Followup to SPARK-56482 (apache#55425). Two groups of changes to `UnionExec`'s
whole-stage codegen path.

Code cleanness:

- Hoist `metricTerm("numOutputRows")` to `doProduce` and store it on the
  instance. `doConsume` runs once per child during emission, so the
  previous code registered the same metric N times in `references[]` for
  an N-child Union; now once.
- Drop the dead `assert` in `perChildProjections` and the duplicate
  `allChildOutputDataTypesMatch` lazy val. The dataType comparison now
  has a single source of truth in the `type-mismatch` branch of the gate.
- Inline the one-shot `hasAnyPartitionIndexDependentDescendant` lazy val.
- Drop the unreachable `case other` in the `UnionPartition` match and
  replace with `asInstanceOf`. `unionedInputRDD` is built as
  `new UnionRDD(...)` two lines up, and `getPartitions` only ever returns
  `UnionPartition[_]`.
- Factor `isPlainUnion` helper used by the gate and `doExecute` so the
  invariant "codegen path matches `sparkContext.union` semantics" lives
  in one place.
- Hoist child-local idx to a `childLocalIdx` local at helper entry.
  References emitted by `RangeExec`/`SampleExec` now read a plain int
  instead of re-evaluating `((int[]) refs[K])[partitionIndex]` per use.
- Drop the `try/finally` around codegen state restoration. Codegen
  failure aborts the whole stage, so the restoration is unreachable.

Initial review findings:

- Narrow `hasPartitionIndexDependentCodegen` to exclude `InputFileName`,
  `InputFileBlockStart`, and `InputFileBlockLength`. These are
  `Nondeterministic` but read from `InputFileBlockHolder` (a per-task
  thread-local) and do not embed `partitionIndex`, so they are safe
  under fusion. Queries like
  `SELECT input_file_name() FROM a UNION ALL SELECT input_file_name() FROM b`
  now fuse.
- Add tests for the `partitioning-aware` fallback branch and a positive
  test for `input_file_name` fusion.

### Why are the changes needed?

The cleanups remove accidental complexity in the fused code path: an
N-fold metric reference, two duplicated dataType comparisons, an
unreachable defensive guard, a per-iteration array deref, and a
try/finally that protects against an unreachable case. The gate
narrowing turns a missed optimization (file-scan unions) into a fused
plan.

### Does this PR introduce _any_ user-facing change?

No. `spark.sql.codegen.wholeStage.union.enabled` remains off by default;
when on, the new behavior fuses additional plans (file-scan unions with
`input_file_name()`) that the previous gate over-rejected.

### How was this patch tested?

`UnionCodegenSuite`, `UnionCodegenAnsiSuite`, `UnionCodegenAqeSuite`, and
the relevant `SQLMetricsSuite` test all pass. Two tests added:
`partitioning-aware union falls back to non-codegen` and
`input_file_name child fuses (Nondeterministic but partition-index-free)`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

Co-authored-by: Isaac
@cloud-fan
Copy link
Copy Markdown
Contributor Author

cc @LuciferYang

"numOutputRows should be 0 for all-empty union")
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 test("SPARK-56482: union with sample children fuses (or falls back) without crashing") {
      val a = rangeDF(20).sample(false, 0.5, 1L)
      val b = rangeDF(20).sample(false, 0.5, 1L)
      val df = a.union(b).filter(col("id") > 0)
      df.collect()
      assertFlagParity(() => a.union(b).orderBy("id"))
    }

After this PR, the test above appears to fail

13:47:26.119 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13:47:27.985 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13014)
        at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7199)
        at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:236)
        at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6684)
        at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6681)
        at org.codehaus.janino.Java$Package.accept(Java.java:4627)
        at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6681)
13:47:27.992 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskCont...
[info] - SPARK-56482: union with sample children fuses (or falls back) without crashing *** FAILED *** (1 second, 214 milliseconds)
[info]   java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"

…te children

Bind `currentPartitionIndexVar` to the array-deref expression rather
than a helper-local variable, so leaf operators like `SampleExec` whose
`addMutableState` initializers are emitted into the state-init function
(outside the per-child helper) can still resolve it.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants