Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -901,61 +901,47 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan with CodegenSup
}
}

// `WidenSetOperationTypes` inserts a `Project(Cast)` above each child whose
// dataType differs from the widened set type, so on the codegen path
// `src.dataType == tgt.dataType` holds. The Alias only remaps each child
// attribute onto the union's output exprId/name/metadata. Mismatched cases
// are gated upstream by `allChildOutputDataTypesMatch`, so the assert is a
// defensive guard.
// True when the codegen path applies: `outputPartitioning` is `UnknownPartitioning`,
// and `unionedInputRDD` matches the semantics of `sparkContext.union(...)` in `doExecute`.
private def isPlainUnion: Boolean = outputPartitioning.isInstanceOf[UnknownPartitioning]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would using @transient lazy val be a bit better?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very simple computation so using lazy val likely not worth it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me


// Per-child projection from the child's output to the union's output. The wrapped
// child is always the source `Attribute` (deterministic by construction); the Alias
// only remaps the exprId/name/metadata. `WidenSetOperationTypes` aligns top-level
// dataTypes, but nested nullability differences bypass it; those cases are caught
// by the `type-mismatch` gate below, which is the single source of truth for the
// `src.dataType == tgt.dataType` invariant `doConsume` relies on.
@transient private lazy val perChildProjections: IndexedSeq[Seq[NamedExpression]] =
children.toIndexedSeq.map { child =>
child.output.zip(output).map { case (src, tgt) =>
assert(src.dataType == tgt.dataType,
s"UnionExec child output dataType ${src.dataType} does not match " +
s"union output dataType ${tgt.dataType}; supportCodegen should " +
"have returned false via the 'type-mismatch' reason.")
Alias(src, tgt.name)(
exprId = tgt.exprId,
qualifier = tgt.qualifier,
explicitMetadata = Some(tgt.metadata))
}
}

// True iff every child output dataType matches the corresponding union
// output dataType, including all nested nullabilities.
// `Union.allChildrenCompatible` ignores nested nullability, so children
// differing only there bypass `WidenSetOperationTypes`; `UnionExec.output`
// then merges those flags via `StructType.unionLikeMerge`, leaving src/tgt
// mismatched.
@transient private lazy val allChildOutputDataTypesMatch: Boolean =
children.forall { c =>
c.output.zip(output).forall { case (src, tgt) => src.dataType == tgt.dataType }
}

// Memoized: `supportCodegen` is called multiple times during planning.
@transient private lazy val hasAnyPartitionIndexDependentDescendant: Boolean =
children.exists(UnionExec.hasPartitionIndexDependentCodegen)

// Memoized: consulted by `supportCodegen` (called multiple times by
// `CollapseCodegenStages`) and by `metrics`. Conf and children are stable
// for a given UnionExec instance; cross-plan staleness is impossible since
// UnionExec is a case class and `withNewChildren` produces a fresh instance.
@transient private lazy val supportCodegenFailureReason: Option[String] = {
if (!conf.getConf(SQLConf.WHOLESTAGE_UNION_CODEGEN_ENABLED)) {
Some("union-codegen-disabled")
} else if (!outputPartitioning.isInstanceOf[UnknownPartitioning]) {
} else if (!isPlainUnion) {
Some("partitioning-aware")
} else if (children.exists(_.exists(_.isInstanceOf[UnionExec]))) {
Some("nested-union")
} else if (children.exists(_.exists(UnionExec.isKnownMultiInputRDDCodegen))) {
Some("multi-rdd-child")
} else if (hasAnyPartitionIndexDependentDescendant) {
} else if (children.exists(UnionExec.hasPartitionIndexDependentCodegen)) {
Some("partition-index-dependent-child")
} else if (children.size > conf.getConf(SQLConf.WHOLESTAGE_UNION_MAX_CHILDREN)) {
Some("max-children-exceeded")
} else if (supportsColumnar) {
Some("columnar")
} else if (!allChildOutputDataTypesMatch) {
} else if (children.exists(c =>
c.output.zip(output).exists { case (src, tgt) => src.dataType != tgt.dataType })) {
Some("type-mismatch")
} else {
None
Expand Down Expand Up @@ -1002,61 +988,66 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan with CodegenSup

override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(unionedInputRDD)

// Driver-side cursor written by `doProduce` and read by `doConsume` during
// single-threaded code emission; resets to -1 once emission completes.
// Set in `doProduce`, read in `doConsume` during single-threaded code
// emission. `numOutputRowsTerm` is registered once per stage so the
// metric appears in `references[]` exactly once instead of once per
// child. `currentEmittingChild` tells `doConsume` which child's
// projection to bind.
@transient private var numOutputRowsTerm: String = _
@transient private var currentEmittingChild: Int = -1

override protected def doProduce(ctx: CodegenContext): String = {
numOutputRowsTerm = metricTerm(ctx, "numOutputRows")

// For each partition of the unioned RDD, record its owning child and its
// index within that child's RDD. Read both fields directly off the
// `UnionPartition` so the lookup arrays do not assume `UnionRDD` lays
// partitions out in child order.
val (partitionToChild, partitionToLocalIdx) =
unionedInputRDD.partitions.map {
case up: UnionPartition[_] => (up.parentRddIndex, up.parentPartition.index)
case other =>
throw SparkException.internalError(
s"UnionExec: Unexpected partition type ${other.getClass.getName}")
}.unzip
val (partitionToChild, partitionToLocalIdx) = unionedInputRDD.partitions.map { p =>
val up = p.asInstanceOf[UnionPartition[_]]
(up.parentRddIndex, up.parentPartition.index)
}.unzip
val p2cRef = ctx.addReferenceObj("partitionToChild", partitionToChild)
val p2lRef = ctx.addReferenceObj("partitionToLocalIdx", partitionToLocalIdx)
val childIndexVar = ctx.freshName("unionChildIdx")

// Each child's produce output is wrapped in its own helper method. The
// outer `switch` in `doProduce`'s return value dispatches to the helper.
// Each child's produced code is wrapped in its own helper method.
// Without this, the fused method's bytecode grows linearly with the
// number of children and quickly exceeds HotSpot's per-method limit,
// forcing the whole stage to run interpreted.
//
// `partitionIndex` is passed as a parameter (shadowing the superclass
// field) rather than read from the enclosing scope. `addNewFunction` may
// spill helpers into a nested class when the outer class fills up, and a
// nested class cannot access the protected
// `BufferedRowIterator.partitionIndex` field. Using the parameter name
// `partitionIndex` keeps any child-emitted reference to that identifier
// resolving locally.
// The helper takes `int partitionIndex` as a parameter; `addNewFunction`
// may spill helpers into a nested class once the outer class fills up,
// and a nested class cannot access the protected
// `BufferedRowIterator.partitionIndex` field.
//
// `currentPartitionIndexVar` is rebound to an array-deref expression
// (rather than a local) so leaf operators (`RangeExec`, `SampleExec`)
// see the child-local index regardless of where their code is emitted.
// `SampleExec.doConsume` uses `addMutableState`, whose initializer is
// emitted into the state-init function, not the helper - a local in
// the helper would not be in scope there. The expression resolves
// against `partitionIndex` (the helper parameter inside the helper,
// and the `BufferedRowIterator` field elsewhere) in every context.
val savedPartIdxVar = ctx.currentPartitionIndexVar
val cases = try {
children.zipWithIndex.map { case (c, i) =>
currentEmittingChild = i
ctx.currentPartitionIndexVar = s"((int[]) $p2lRef)[partitionIndex]"
val producedCode = c.asInstanceOf[CodegenSupport].produce(ctx, this)
val helper = ctx.freshName("unionChildProcess")
val qualifiedHelper = ctx.addNewFunction(helper,
s"""
|private void $helper(int partitionIndex) throws java.io.IOException {
| $producedCode
|}
""".stripMargin)
s"""case $i: {
| $qualifiedHelper(partitionIndex);
| break;
|}""".stripMargin
}
} finally {
currentEmittingChild = -1
ctx.currentPartitionIndexVar = savedPartIdxVar
ctx.currentPartitionIndexVar = s"((int[]) $p2lRef)[partitionIndex]"
val cases = children.zipWithIndex.map { case (c, i) =>
currentEmittingChild = i
val producedCode = c.asInstanceOf[CodegenSupport].produce(ctx, this)
val helper = ctx.freshName("unionChildProcess")
val qualifiedHelper = ctx.addNewFunction(helper,
s"""
|private void $helper(int partitionIndex) throws java.io.IOException {
| $producedCode
|}
""".stripMargin)
s"""case $i: {
| $qualifiedHelper(partitionIndex);
| break;
|}""".stripMargin
}
currentEmittingChild = -1
ctx.currentPartitionIndexVar = savedPartIdxVar

s"""
|int $childIndexVar = ((int[]) $p2cRef)[partitionIndex];
Expand All @@ -1071,24 +1062,17 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan with CodegenSup

override def doConsume(
ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
require(currentEmittingChild >= 0,
"UnionExec.doConsume invoked outside doProduce emission window")
val i = currentEmittingChild
// The wrapped child in each `perChildProjections(i)` element is always an
// `Attribute`, which is deterministic by definition; no
// `evaluateRequiredVariables` call is needed to force single-evaluation
// of non-deterministic expressions.
val bound = BindReferences.bindReferences(perChildProjections(i), children(i).output)

require(i >= 0, "UnionExec.doConsume invoked outside doProduce emission window")
// Route BoundReference reads through `currentVars` (the incoming row is
// delivered as variables under WSCG, not via ctx.INPUT_ROW).
val bound = BindReferences.bindReferences(perChildProjections(i), children(i).output)
ctx.currentVars = input
ctx.INPUT_ROW = null
val projectedExprCodes = bound.map(_.genCode(ctx))

val numOutput = metricTerm(ctx, "numOutputRows")
s"""
|$numOutput.add(1L);
|$numOutputRowsTerm.add(1L);
|${consume(ctx, projectedExprCodes)}
""".stripMargin
}
Expand All @@ -1103,7 +1087,7 @@ case class UnionExec(children: Seq[SparkPlan]) extends SparkPlan with CodegenSup
override def usedInputs: AttributeSet = AttributeSet.empty

protected override def doExecute(): RDD[InternalRow] = {
if (outputPartitioning.isInstanceOf[UnknownPartitioning]) {
if (isPlainUnion) {
sparkContext.union(children.map(_.execute()))
} else {
// This union has a known partitioning, i.e., its children have the same partitioning
Expand Down Expand Up @@ -1138,13 +1122,24 @@ object UnionExec {
}

/**
* True if any expression in the subtree is [[Nondeterministic]]. Such
* expressions may embed the raw `partitionIndex` field via
* `addPartitionInitializationStatement`, which would read the global
* True if any expression in the subtree embeds the raw `partitionIndex` field
* via `addPartitionInitializationStatement`, which would read the global
* UnionRDD index instead of the child-local one under fusion.
*
* The check uses [[Nondeterministic]] as the proxy: every catalyst expression
* that calls `addPartitionInitializationStatement` referencing `partitionIndex`
* is `Nondeterministic`. The `InputFile*` expressions are `Nondeterministic`
* but read from `InputFileBlockHolder` (a per-task thread-local) and do not
* embed `partitionIndex`, so they are safe under fusion.
*/
def hasPartitionIndexDependentCodegen(p: SparkPlan): Boolean = p.exists {
plan => plan.expressions.exists(_.exists(_.isInstanceOf[Nondeterministic]))
def hasPartitionIndexDependentCodegen(p: SparkPlan): Boolean = p.exists { plan =>
plan.expressions.exists(_.exists {
case _: InputFileName => false
case _: InputFileBlockStart => false
case _: InputFileBlockLength => false
case _: Nondeterministic => true
case _ => false
})
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,57 @@ class UnionCodegenSuite extends QueryTest with SharedSparkSession {
"numOutputRows should be 0 for all-empty union")
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 test("SPARK-56482: union with sample children fuses (or falls back) without crashing") {
      val a = rangeDF(20).sample(false, 0.5, 1L)
      val b = rangeDF(20).sample(false, 0.5, 1L)
      val df = a.union(b).filter(col("id") > 0)
      df.collect()
      assertFlagParity(() => a.union(b).orderBy("id"))
    }

After this PR, the test above appears to fail

13:47:26.119 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
13:47:27.985 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator: Failed to compile the generated Java code.
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"
        at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:13014)
        at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7199)
        at org.codehaus.janino.UnitCompiler.access$14300(UnitCompiler.java:236)
        at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6684)
        at org.codehaus.janino.UnitCompiler$23.visitPackage(UnitCompiler.java:6681)
        at org.codehaus.janino.Java$Package.accept(Java.java:4627)
        at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6681)
13:47:27.992 ERROR org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskCont...
[info] - SPARK-56482: union with sample children fuses (or falls back) without crashing *** FAILED *** (1 second, 214 milliseconds)
[info]   java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 168, Column 44: Unknown variable or type "childLocalIdx"

test("SPARK-56482: partitioning-aware union falls back to non-codegen") {
// After repartition, both children expose a `HashPartitioning` on the same key,
// so `UnionExec.outputPartitioning` is non-Unknown and the codegen path is denied.
// AQE is disabled here so the executedPlan exposes the UnionExec directly
// (under AQE the plan is wrapped in `AdaptiveSparkPlanExec`, which does not
// surface its inputPlan via `children`).
withSQLConf(
SQLConf.UNION_OUTPUT_PARTITIONING.key -> "true",
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val a = rangeDF(100).repartition(4, col("id"))
val b = rangeDF(100, 200).repartition(4, col("id"))
val df = a.union(b)
assert(!unionInsideWSCG(df),
"Partitioning-aware union must not fuse into WSCG")
val unionExec = df.queryExecution.executedPlan.collectFirst {
case u: UnionExec => u
}.get
assert(!unionExec.metrics.contains("numOutputRows"),
"numOutputRows metric must not be registered on the partitioning-aware path")
assertFlagParity(() => a.union(b).orderBy("id"))
}
}

test("SPARK-56482: input_file_name child fuses (Nondeterministic but partition-index-free)") {
// `InputFileName` is `Nondeterministic` but reads from `InputFileBlockHolder`
// (a per-task thread-local) and does not embed `partitionIndex`. The gate's
// narrow check should let this fuse.
withTempPath { dir =>
val path = dir.getCanonicalPath
rangeDF(20).write.parquet(path)
val a = spark.read.parquet(path).select(col("id"), input_file_name().as("f"))
val b = spark.read.parquet(path).select(col("id"), input_file_name().as("f"))
val df = a.union(b).filter(col("id") > 0)
assert(unionInsideWSCG(df),
"Union with input_file_name child should fuse into WSCG")
assertFlagParity(() => a.union(b).orderBy("id", "f"))
}
}

test("SPARK-56482: union with sample children fuses (or falls back) without crashing") {
// `SampleExec.doConsume` reads `currentPartitionIndexVar` from inside an
// `addMutableState` initializer, which is emitted into the state-init
// function rather than the per-child helper. The bound expression must
// therefore resolve in any emission scope, not just inside the helper.
val a = rangeDF(20).sample(false, 0.5, 1L)
val b = rangeDF(20).sample(false, 0.5, 1L)
val df = a.union(b).filter(col("id") > 0)
df.collect()
assertFlagParity(() => a.union(b).orderBy("id"))
}
}

/** Runs [[UnionCodegenSuite]] with ANSI mode enabled. */
Expand Down