Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 18 additions & 60 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1998,9 +1998,9 @@ index 104b4e416cd..b8af360fa14 100644
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
- assert(actual > 1 && actual < data.length)
+ // Only enable Comet test iff it's scan only, since with native execution
+ // Skip when Comet is enabled since with native execution
+ // `stripSparkFilter` can't remove the native filter
+ if (!isCometEnabled || isCometScanOnly) {
+ if (!isCometEnabled) {
+ assert(actual > 1 && actual < data.length)
+ }
}
Expand All @@ -2011,9 +2011,9 @@ index 104b4e416cd..b8af360fa14 100644
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
- assert(actual > 1 && actual < data.length)
+ // Only enable Comet test iff it's scan only, since with native execution
+ // Skip when Comet is enabled since with native execution
+ // `stripSparkFilter` can't remove the native filter
+ if (!isCometEnabled || isCometScanOnly) {
+ if (!isCometEnabled) {
+ assert(actual > 1 && actual < data.length)
+ }
}
Expand Down Expand Up @@ -2926,32 +2926,14 @@ index dd55fcfe42c..99bc018008a 100644
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
super.test(testName, testTags: _*) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
@@ -242,6 +272,11 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}

+ /**
+ * Whether Comet extension is enabled
+ */
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
+
+ /**
+ * Whether to enable ansi mode This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def enableCometAnsiMode: Boolean = {
+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
+ v != null && v.toBoolean
+ }
+
+ /**
+ * Whether Spark should only apply Comet scan optimization. This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def isCometScanOnly: Boolean = {
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ v != null && v.toBoolean
+ }
+
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
Expand All @@ -2969,7 +2951,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSessio
index ed2e309fa07..a5ea58146ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2980,23 +2962,12 @@ index ed2e309fa07..a5ea58146ad 100644
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
+
+ if (!isCometScanOnly) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ }
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down Expand Up @@ -3093,7 +3064,7 @@ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.sca
index 07361cfdce9..97dab2a3506 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -55,25 +55,54 @@ object TestHive
@@ -55,25 +55,41 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3140,24 +3111,11 @@ index 07361cfdce9..97dab2a3506 100644
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ if (v == null || !v.toBoolean) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
+ if (a != null && a.toBoolean) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ }
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ }

+ conf
Expand Down
78 changes: 18 additions & 60 deletions dev/diffs/3.5.8.diff
Original file line number Diff line number Diff line change
Expand Up @@ -1979,9 +1979,9 @@ index 8e88049f51e..20d7ef7b1bc 100644
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
- assert(actual > 1 && actual < data.length)
+ // Only enable Comet test iff it's scan only, since with native execution
+ // Skip when Comet is enabled since with native execution
+ // `stripSparkFilter` can't remove the native filter
+ if (!isCometEnabled || isCometScanOnly) {
+ if (!isCometEnabled) {
+ assert(actual > 1 && actual < data.length)
+ }
}
Expand All @@ -1992,9 +1992,9 @@ index 8e88049f51e..20d7ef7b1bc 100644
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
- assert(actual > 1 && actual < data.length)
+ // Only enable Comet test iff it's scan only, since with native execution
+ // Skip when Comet is enabled since with native execution
+ // `stripSparkFilter` can't remove the native filter
+ if (!isCometEnabled || isCometScanOnly) {
+ if (!isCometEnabled) {
+ assert(actual > 1 && actual < data.length)
+ }
}
Expand Down Expand Up @@ -2878,32 +2878,14 @@ index e937173a590..7d20538bc68 100644
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
super.test(testName, testTags: _*) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
@@ -242,6 +272,29 @@ private[sql] trait SQLTestUtilsBase
@@ -242,6 +272,11 @@ private[sql] trait SQLTestUtilsBase
protected override def _sqlContext: SQLContext = self.spark.sqlContext
}

+ /**
+ * Whether Comet extension is enabled
+ */
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
+
+ /**
+ * Whether to enable ansi mode This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def enableCometAnsiMode: Boolean = {
+ val v = System.getenv("ENABLE_COMET_ANSI_MODE")
+ v != null && v.toBoolean
+ }
+
+ /**
+ * Whether Spark should only apply Comet scan optimization. This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def isCometScanOnly: Boolean = {
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ v != null && v.toBoolean
+ }
+
protected override def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
SparkSession.setActiveSession(spark)
Expand All @@ -2921,7 +2903,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSessio
index ed2e309fa07..a5ea58146ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -74,6 +74,31 @@ trait SharedSparkSessionBase
@@ -74,6 +74,20 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -2932,23 +2914,12 @@ index ed2e309fa07..a5ea58146ad 100644
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
+
+ if (!isCometScanOnly) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ if (enableCometAnsiMode) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ }
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ }
conf.set(
StaticSQLConf.WAREHOUSE_PATH,
Expand Down Expand Up @@ -3045,7 +3016,7 @@ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.sca
index 1d646f40b3e..5babe505301 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,25 +53,54 @@ object TestHive
@@ -53,25 +53,41 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3092,24 +3063,11 @@ index 1d646f40b3e..5babe505301 100644
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ if (v == null || !v.toBoolean) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+
+ val a = System.getenv("ENABLE_COMET_ANSI_MODE")
+ if (a != null && a.toBoolean) {
+ conf
+ .set("spark.sql.ansi.enabled", "true")
+ }
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ }

+ conf
Expand Down
58 changes: 18 additions & 40 deletions dev/diffs/4.0.2.diff
Original file line number Diff line number Diff line change
Expand Up @@ -2583,9 +2583,9 @@ index 6080a5e8e4b..ea058d57b4b 100644
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
- assert(actual > 1 && actual < data.length)
+ // Only enable Comet test iff it's scan only, since with native execution
+ // Skip when Comet is enabled since with native execution
+ // `stripSparkFilter` can't remove the native filter
+ if (!isCometEnabled || isCometScanOnly) {
+ if (!isCometEnabled) {
+ assert(actual > 1 && actual < data.length)
+ }
}
Expand All @@ -2596,9 +2596,9 @@ index 6080a5e8e4b..ea058d57b4b 100644
// Note that, if record level filtering is enabled, it should be a single record.
// If no filter is pushed down to Parquet, it should be the total length of data.
- assert(actual > 1 && actual < data.length)
+ // Only enable Comet test iff it's scan only, since with native execution
+ // Skip when Comet is enabled since with native execution
+ // `stripSparkFilter` can't remove the native filter
+ if (!isCometEnabled || isCometScanOnly) {
+ if (!isCometEnabled) {
+ assert(actual > 1 && actual < data.length)
+ }
}
Expand Down Expand Up @@ -3596,23 +3596,14 @@ index f0f3f94b811..f77b54dcef9 100644
if (testTags.exists(_.isInstanceOf[DisableAdaptiveExecution])) {
super.test(testName, testTags: _*) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
@@ -248,8 +278,24 @@ private[sql] trait SQLTestUtilsBase
@@ -248,8 +278,15 @@ private[sql] trait SQLTestUtilsBase
override protected def converter: ColumnNodeToExpressionConverter = self.spark.converter
}

+ /**
+ * Whether Comet extension is enabled
+ */
+ protected def isCometEnabled: Boolean = SparkSession.isCometEnabled
+
+ /**
+ * Whether Spark should only apply Comet scan optimization. This is only effective when
+ * [[isCometEnabled]] returns true.
+ */
+ protected def isCometScanOnly: Boolean = {
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ v != null && v.toBoolean
+ }
+
protected override def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
SparkSession.setActiveSession(spark)
Expand All @@ -3634,7 +3625,7 @@ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSessio
index 245219c1756..a611836f086 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -75,6 +75,27 @@ trait SharedSparkSessionBase
@@ -75,6 +75,21 @@ trait SharedSparkSessionBase
// this rule may potentially block testing of other optimization rules such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
Expand All @@ -3645,18 +3636,12 @@ index 245219c1756..a611836f086 100644
+ .set("spark.comet.enabled", "true")
+ .set("spark.comet.parquet.respectFilterPushdown", "true")
+
+ if (!isCometScanOnly) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ .set("spark.comet.memoryOverhead", "10g")
+
+ }
conf.set(
Expand Down Expand Up @@ -3783,7 +3768,7 @@ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.sca
index a394d0b7393..a4bc3d3fd8e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -53,24 +53,41 @@ object TestHive
@@ -53,24 +53,34 @@ object TestHive
new SparkContext(
System.getProperty("spark.sql.test.master", "local[1]"),
"TestSQLContext",
Expand Down Expand Up @@ -3823,18 +3808,11 @@ index a394d0b7393..a4bc3d3fd8e 100644
+ .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions")
+ .set("spark.comet.enabled", "true")
+
+ val v = System.getenv("ENABLE_COMET_SCAN_ONLY")
+ if (v == null || !v.toBoolean) {
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ } else {
+ conf
+ .set("spark.comet.exec.enabled", "false")
+ .set("spark.comet.exec.shuffle.enabled", "false")
+ }
+ conf
+ .set("spark.comet.exec.enabled", "true")
+ .set("spark.shuffle.manager",
+ "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
+ .set("spark.comet.exec.shuffle.enabled", "true")
+ }
+
+ conf
Expand Down
Loading
Loading