Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1688,27 +1688,36 @@ impl PhysicalPlanner {
let left = Arc::clone(&join_params.left.native_plan);
let right = Arc::clone(&join_params.right.native_plan);

// Null-aware anti-join must run in CollectLeft mode. In Partitioned mode
// each partition only sees per-partition null/emptiness state, which can
// produce wrong NOT IN results across partitions. DataFusion's JoinSelection
// rewrites null-aware joins to CollectLeft for this reason, but Comet
// executes the physical plan directly so we must pick the mode here.
let partition_mode = if join.null_aware_anti_join {
PartitionMode::CollectLeft
} else {
PartitionMode::Partitioned
};

let hash_join = Arc::new(HashJoinExec::try_new(
left,
right,
join_params.join_on,
join_params.join_filter,
&join_params.join_type,
None,
PartitionMode::Partitioned,
partition_mode,
// null doesn't equal to null in Spark join key. If the join key is
// `EqualNullSafe`, Spark will rewrite it during planning.
NullEquality::NullEqualsNothing,
// null_aware is for null-aware anti joins (NOT IN subqueries).
// NullEquality controls whether NULL = NULL in join keys generally,
// while null_aware changes anti-join semantics so any NULL changes
// the entire result. Spark doesn't use this path (it rewrites
// EqualNullSafe at plan time), so false is correct.
false,
join.null_aware_anti_join,
)?);

// If the hash join is build right, we need to swap the left and right
if join.build_side == BuildSide::BuildLeft as i32 {
// If the hash join is build right, we need to swap the left and right.
// Exception: null-aware anti-join requires LeftAnti + build-right semantics
// (which matches DataFusion's default), and swap_inputs would turn LeftAnti
// into RightAnti, which DataFusion rejects with null_aware=true.
if join.build_side == BuildSide::BuildLeft as i32 || join.null_aware_anti_join {
Ok((
scans,
shuffle_scans,
Expand Down Expand Up @@ -4038,6 +4047,7 @@ mod tests {
join_type: 0,
condition: None,
build_side: 0,
null_aware_anti_join: false,
})),
};

Expand Down
3 changes: 3 additions & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ message HashJoin {
JoinType join_type = 3;
optional spark.spark_expression.Expr condition = 4;
BuildSide build_side = 5;
// True for BroadcastHashJoinExec null-aware anti-joins (NOT IN subquery semantics).
// When true, any null in the build side suppresses all left rows.
bool null_aware_anti_join = 6;
}

message SortMergeJoin {
Expand Down
5 changes: 2 additions & 3 deletions spark/src/main/scala/org/apache/comet/rules/RewriteJoin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
package org.apache.comet.rules

import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, BuildSide, JoinSelectionHelper}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
import org.apache.spark.sql.catalyst.plans.LeftSemi
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution.{SortExec, SparkPlan}
import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoinExec}
Expand Down Expand Up @@ -67,8 +67,7 @@ object RewriteJoin extends JoinSelectionHelper {
def rewrite(plan: SparkPlan): SparkPlan = plan match {
case smj: SortMergeJoinExec =>
getSmjBuildSide(smj) match {
case Some(BuildRight) if smj.joinType == LeftAnti || smj.joinType == LeftSemi =>
// LeftAnti https://github.com/apache/datafusion-comet/issues/457
case Some(BuildRight) if smj.joinType == LeftSemi =>
// LeftSemi https://github.com/apache/datafusion-comet/issues/2667
withInfo(
smj,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1710,10 +1710,10 @@ trait CometHashJoin {
return None
}

if (join.buildSide == BuildRight && join.joinType == LeftAnti) {
// https://github.com/apache/datafusion-comet/issues/457
withInfo(join, "BuildRight with LeftAnti is not supported")
return None
// Only BroadcastHashJoinExec can be null-aware (NOT IN subqueries).
val isNullAwareAntiJoin = join match {
case bhj: BroadcastHashJoinExec => bhj.isNullAwareAntiJoin
case _ => false
}

val condition = join.condition.map { cond =>
Expand Down Expand Up @@ -1754,6 +1754,7 @@ trait CometHashJoin {
.addAllRightJoinKeys(rightKeys.map(_.get).asJava)
.setBuildSide(if (join.buildSide == BuildLeft) OperatorOuterClass.BuildSide.BuildLeft
else OperatorOuterClass.BuildSide.BuildRight)
.setNullAwareAntiJoin(isNullAwareAntiJoin)
condition.foreach(joinBuilder.setCondition)
Some(builder.setHashJoin(joinBuilder).build())
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,60 +1,56 @@
TakeOrderedAndProject
+- HashAggregate
+- Exchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported]
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometNativeScan parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometNativeScan parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
+- CometProject
+- CometBroadcastHashJoin
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometBroadcastHashJoin
: : : :- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometNativeScan parquet spark_catalog.default.store_sales
: : : : : +- CometSubqueryBroadcast
: : : : : +- CometBroadcastExchange
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometNativeScan parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometNativeScan parquet spark_catalog.default.date_dim
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometNativeScan parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometNativeScan parquet spark_catalog.default.date_dim
: +- CometBroadcastExchange
: +- CometProject
: +- CometFilter
: +- CometNativeScan parquet spark_catalog.default.customer_address
+- CometBroadcastExchange
+- CometProject
+- CometFilter
+- CometNativeScan parquet spark_catalog.default.customer_demographics

Comet accelerated 35 out of 53 eligible operators (66%). Final plan contains 5 transitions between Spark and Comet.
Comet accelerated 50 out of 53 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet.
Original file line number Diff line number Diff line change
@@ -1,61 +1,57 @@
TakeOrderedAndProject
+- HashAggregate
+- Exchange
+- HashAggregate
+- Project
+- BroadcastHashJoin
:- Project
: +- BroadcastHashJoin
: :- Project
: : +- BroadcastHashJoin
: : :- BroadcastHashJoin [COMET: BuildRight with LeftAnti is not supported]
: : : :- CometNativeColumnarToRow
: : : : +- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : +- BroadcastExchange
: : : +- CometNativeColumnarToRow
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : +- BroadcastExchange
: : +- CometNativeColumnarToRow
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: +- BroadcastExchange
: +- CometNativeColumnarToRow
: +- CometProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
+- BroadcastExchange
+- CometNativeColumnarToRow
CometNativeColumnarToRow
+- CometTakeOrderedAndProject
+- CometHashAggregate
+- CometExchange
+- CometHashAggregate
+- CometProject
+- CometBroadcastHashJoin
:- CometProject
: +- CometBroadcastHashJoin
: :- CometProject
: : +- CometBroadcastHashJoin
: : :- CometBroadcastHashJoin
: : : :- CometBroadcastHashJoin
: : : : :- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometBroadcastHashJoin
: : : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.store_sales
: : : : : +- SubqueryBroadcast
: : : : : +- BroadcastExchange
: : : : : +- CometNativeColumnarToRow
: : : : : +- CometProject
: : : : : +- CometFilter
: : : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : : +- CometBroadcastExchange
: : : : +- CometProject
: : : : +- CometFilter
: : : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometBroadcastHashJoin
: : : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.web_sales
: : : : +- ReusedSubquery
: : : +- CometBroadcastExchange
: : : +- CometProject
: : : +- CometFilter
: : : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometBroadcastHashJoin
: : :- CometScan [native_iceberg_compat] parquet spark_catalog.default.catalog_sales
: : : +- ReusedSubquery
: : +- CometBroadcastExchange
: : +- CometProject
: : +- CometFilter
: : +- CometScan [native_iceberg_compat] parquet spark_catalog.default.date_dim
: +- CometBroadcastExchange
: +- CometProject
: +- CometFilter
: +- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_address
+- CometBroadcastExchange
+- CometProject
+- CometFilter
+- CometScan [native_iceberg_compat] parquet spark_catalog.default.customer_demographics

Comet accelerated 34 out of 53 eligible operators (64%). Final plan contains 6 transitions between Spark and Comet.
Comet accelerated 49 out of 53 eligible operators (92%). Final plan contains 2 transitions between Spark and Comet.
Loading
Loading