diff --git a/.dockerignore b/.dockerignore
index 332acff447..c5b0e9c12c 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -10,7 +10,6 @@ metastore_db
target
common/target
spark-integration/target
-fuzz-testing/target
spark/target
native/target
core/target
diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml
index c97445ea1d..edc6f5b46d 100644
--- a/.github/workflows/iceberg_spark_test.yml
+++ b/.github/workflows/iceberg_spark_test.yml
@@ -36,7 +36,6 @@ on:
- "spark/src/test/**"
- "common/src/test/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
- - "fuzz-testing/**"
- "spark-integration/**"
pull_request:
paths-ignore:
@@ -50,7 +49,6 @@ on:
- "spark/src/test/**"
- "common/src/test/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
- - "fuzz-testing/**"
- "spark-integration/**"
# manual trigger
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml
index 70f3fc92d5..5d23acb2f8 100644
--- a/.github/workflows/spark_sql_test.yml
+++ b/.github/workflows/spark_sql_test.yml
@@ -36,7 +36,6 @@ on:
- "spark/src/test/**"
- "common/src/test/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
- - "fuzz-testing/**"
- "spark-integration/**"
pull_request:
paths-ignore:
@@ -50,7 +49,6 @@ on:
- "spark/src/test/**"
- "common/src/test/**"
- "spark/src/main/scala/org/apache/comet/GenerateDocs.scala"
- - "fuzz-testing/**"
- "spark-integration/**"
# manual trigger
# https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow
diff --git a/fuzz-testing/.gitignore b/fuzz-testing/.gitignore
deleted file mode 100644
index 570ff02a76..0000000000
--- a/fuzz-testing/.gitignore
+++ /dev/null
@@ -1,6 +0,0 @@
-.idea
-target
-spark-warehouse
-queries.sql
-results*.md
-test*.parquet
\ No newline at end of file
diff --git a/fuzz-testing/README.md b/fuzz-testing/README.md
deleted file mode 100644
index c9f4d881d4..0000000000
--- a/fuzz-testing/README.md
+++ /dev/null
@@ -1,120 +0,0 @@
-
-
-# Comet Fuzz
-
-Comet Fuzz is a standalone project for generating random data and queries and executing queries against Spark
-with Comet disabled and enabled and checking for incompatibilities.
-
-Although it is a simple tool it has already been useful in finding many bugs.
-
-Comet Fuzz is inspired by the [SparkFuzz](https://ir.cwi.nl/pub/30222) paper from Databricks and CWI.
-
-## Roadmap
-
-Planned areas of improvement:
-
-- ANSI mode
-- Support for all data types, expressions, and operators supported by Comet
-- IF and CASE WHEN expressions
-- Complex (nested) expressions
-- Literal scalar values in queries
-- Add option to avoid grouping and sorting on floating-point columns
-- Improve join query support:
- - Support joins without join keys
- - Support composite join keys
- - Support multiple join keys
- - Support join conditions that use expressions
-
-## Usage
-
-From the root of the project, run `mvn install -DskipTests` to install Comet.
-
-Then build the fuzz testing jar.
-
-```shell
-mvn package
-```
-
-Set appropriate values for `SPARK_HOME`, `SPARK_MASTER`, and `COMET_JAR` environment variables and then use
-`spark-submit` to run CometFuzz against a Spark cluster.
-
-### Generating Data Files
-
-```shell
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --class org.apache.comet.fuzz.Main \
- target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \
- data --num-files=2 --num-rows=200 --exclude-negative-zero --generate-arrays --generate-structs --generate-maps
-```
-
-There is an optional `--exclude-negative-zero` flag for excluding `-0.0` from the generated data, which is
-sometimes useful because we already know that we often have different behavior for this edge case due to
-differences between Rust and Java handling of this value.
-
-### Generating Queries
-
-Generate random queries that are based on the available test files.
-
-```shell
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --class org.apache.comet.fuzz.Main \
- target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \
- queries --num-files=2 --num-queries=500
-```
-
-Note that the output filename is currently hard-coded as `queries.sql`
-
-### Execute Queries
-
-```shell
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16G \
- --conf spark.plugins=org.apache.spark.CometPlugin \
- --conf spark.comet.enabled=true \
- --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
- --conf spark.comet.exec.shuffle.enabled=true \
- --jars $COMET_JAR \
- --conf spark.driver.extraClassPath=$COMET_JAR \
- --conf spark.executor.extraClassPath=$COMET_JAR \
- --class org.apache.comet.fuzz.Main \
- target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \
- run --num-files=2 --filename=queries.sql
-```
-
-Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md`
-
-### Compare existing datasets
-
-To compare a pair of existing datasets you can use a comparison tool.
-The example below is for TPC-H queries results generated by pure Spark and Comet
-
-```shell
-$SPARK_HOME/bin/spark-submit \
- --master $SPARK_MASTER \
- --class org.apache.comet.fuzz.ComparisonTool
- target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \
- compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet
-```
-
-The tool takes a pair of existing folders of the same layout and compares subfolders treating them as parquet based datasets
diff --git a/fuzz-testing/pom.xml b/fuzz-testing/pom.xml
deleted file mode 100644
index f4f1d4ef2b..0000000000
--- a/fuzz-testing/pom.xml
+++ /dev/null
@@ -1,129 +0,0 @@
-
-
-
- 4.0.0
-
-
- org.apache.datafusion
- comet-parent-spark${spark.version.short}_${scala.binary.version}
- 0.16.0-SNAPSHOT
- ../pom.xml
-
-
- comet-fuzz-spark${spark.version.short}_${scala.binary.version}
- comet-fuzz
- http://maven.apache.org
- jar
-
-
-
- false
-
-
-
-
- org.scala-lang
- scala-library
- ${scala.version}
- provided
-
-
- org.apache.spark
- spark-sql_${scala.binary.version}
- provided
-
-
- org.scala-lang.modules
- scala-collection-compat_${scala.binary.version}
-
-
-
-
- org.apache.datafusion
- comet-spark-spark${spark.version.short}_${scala.binary.version}
- ${project.version}
-
-
- org.scala-lang.modules
- scala-collection-compat_${scala.binary.version}
-
-
-
-
- org.rogach
- scallop_${scala.binary.version}
-
-
-
-
- src/main/scala
-
-
- org.apache.maven.plugins
- maven-compiler-plugin
- ${maven-compiler-plugin.version}
-
- ${java.version}
- ${java.version}
-
-
-
- net.alchim31.maven
- scala-maven-plugin
- ${scala.plugin.version}
-
-
-
- compile
- testCompile
-
-
-
-
-
- maven-assembly-plugin
- ${maven-assembly-plugin.version}
-
-
- jar-with-dependencies
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
- org.apache.maven.plugins
- maven-install-plugin
-
- true
-
-
-
-
-
diff --git a/fuzz-testing/run.sh b/fuzz-testing/run.sh
deleted file mode 100755
index 27391249c2..0000000000
--- a/fuzz-testing/run.sh
+++ /dev/null
@@ -1,87 +0,0 @@
-#!/bin/bash
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-# Usage: ./run.sh
-# Builds necessary JARs, generates data and queries, and runs fuzz tests for Comet Spark.
-# Environment variables:
-# SPARK_HOME - path to Spark installation
-# SPARK_MASTER - Spark master URL (default: local[*])
-# SCALA_MAJOR_VERSION - Scala major version to use (default: 2.12)
-# SPARK_MAJOR_VERSION - Spark major version to use (default: 3.5)
-# NUM_FILES - number of data files to generate (default: 2)
-# NUM_ROWS - number of rows per file (default: 200)
-# NUM_QUERIES - number of queries to generate (default: 500)
-
-set -eux
-
-DIR="$(cd "$(dirname "$0")" && pwd)"
-PARENT_DIR="${DIR}/.."
-MVN_CMD="${PARENT_DIR}/mvnw"
-SPARK_MASTER="${SPARK_MASTER:-local[*]}"
-SCALA_MAJOR_VERSION="${SCALA_MAJOR_VERSION:-2.12}"
-SPARK_MAJOR_VERSION="${SPARK_MAJOR_VERSION:-3.5}"
-PROFILES="-Pscala-${SCALA_MAJOR_VERSION},spark-${SPARK_MAJOR_VERSION}"
-PROJECT_VERSION=$("${MVN_CMD}" -f "${DIR}/pom.xml" -q help:evaluate -Dexpression=project.version -DforceStdout)
-COMET_SPARK_JAR="${PARENT_DIR}/spark/target/comet-spark${SPARK_MAJOR_VERSION}_${SCALA_MAJOR_VERSION}-${PROJECT_VERSION}.jar"
-COMET_FUZZ_JAR="${DIR}/target/comet-fuzz-spark${SPARK_MAJOR_VERSION}_${SCALA_MAJOR_VERSION}-${PROJECT_VERSION}-jar-with-dependencies.jar"
-NUM_FILES="${NUM_FILES:-2}"
-NUM_ROWS="${NUM_ROWS:-200}"
-NUM_QUERIES="${NUM_QUERIES:-500}"
-
-if [ ! -f "${COMET_SPARK_JAR}" ]; then
- echo "Building Comet Spark jar..."
- pushd "${PARENT_DIR}"
- PROFILES="${PROFILES}" make
- popd
-else
- echo "Building Fuzz testing jar..."
- "${MVN_CMD}" -f "${DIR}/pom.xml" package -DskipTests "${PROFILES}"
-fi
-
-echo "Generating data..."
-"${SPARK_HOME}/bin/spark-submit" \
- --master "${SPARK_MASTER}" \
- --class org.apache.comet.fuzz.Main \
- "${COMET_FUZZ_JAR}" \
- data --num-files="${NUM_FILES}" --num-rows="${NUM_ROWS}" \
- --exclude-negative-zero \
- --generate-arrays --generate-structs --generate-maps
-
-echo "Generating queries..."
-"${SPARK_HOME}/bin/spark-submit" \
- --master "${SPARK_MASTER}" \
- --class org.apache.comet.fuzz.Main \
- "${COMET_FUZZ_JAR}" \
- queries --num-files="${NUM_FILES}" --num-queries="${NUM_QUERIES}"
-
-echo "Running fuzz tests..."
-"${SPARK_HOME}/bin/spark-submit" \
- --master "${SPARK_MASTER}" \
- --conf spark.memory.offHeap.enabled=true \
- --conf spark.memory.offHeap.size=16G \
- --conf spark.plugins=org.apache.spark.CometPlugin \
- --conf spark.comet.enabled=true \
- --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
- --conf spark.comet.exec.shuffle.enabled=true \
- --jars "${COMET_SPARK_JAR}" \
- --conf spark.driver.extraClassPath="${COMET_SPARK_JAR}" \
- --conf spark.executor.extraClassPath="${COMET_SPARK_JAR}" \
- --class org.apache.comet.fuzz.Main \
- "${COMET_FUZZ_JAR}" \
- run --num-files="${NUM_FILES}" --filename="queries.sql"
diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala
deleted file mode 100644
index 055ea6553f..0000000000
--- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.fuzz
-
-import java.io.File
-
-import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand}
-
-import org.apache.spark.sql.{functions, SparkSession}
-
-class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) {
- object compareParquet extends Subcommand("compareParquet") {
- val inputSparkFolder: ScallopOption[String] =
- opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
- val inputCometFolder: ScallopOption[String] =
- opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
- val tolerance: ScallopOption[Double] =
- opt[Double](default = Some(0.000002), descr = "Tolerance for floating point comparisons")
- }
- addSubcommand(compareParquet)
- verify()
-}
-
-object ComparisonTool {
-
- lazy val spark: SparkSession = SparkSession
- .builder()
- .getOrCreate()
-
- def main(args: Array[String]): Unit = {
- val conf = new ComparisonToolConf(args.toIndexedSeq)
- conf.subcommand match {
- case Some(conf.compareParquet) =>
- compareParquetFolders(
- spark,
- conf.compareParquet.inputSparkFolder(),
- conf.compareParquet.inputCometFolder(),
- conf.compareParquet.tolerance())
-
- case _ =>
- // scalastyle:off println
- println("Invalid subcommand")
- // scalastyle:on println
- sys.exit(-1)
- }
- }
-
- private def compareParquetFolders(
- spark: SparkSession,
- sparkFolderPath: String,
- cometFolderPath: String,
- tolerance: Double): Unit = {
-
- val output = QueryRunner.createOutputMdFile()
-
- try {
- val sparkFolder = new File(sparkFolderPath)
- val cometFolder = new File(cometFolderPath)
-
- if (!sparkFolder.exists() || !sparkFolder.isDirectory) {
- throw new IllegalArgumentException(
- s"Spark folder does not exist or is not a directory: $sparkFolderPath")
- }
-
- if (!cometFolder.exists() || !cometFolder.isDirectory) {
- throw new IllegalArgumentException(
- s"Comet folder does not exist or is not a directory: $cometFolderPath")
- }
-
- // Get all subdirectories from the Spark folder
- val sparkSubfolders = sparkFolder
- .listFiles()
- .filter(_.isDirectory)
- .map(_.getName)
- .sorted
-
- output.write("# Comparing Parquet Folders\n\n")
- output.write(s"Spark folder: $sparkFolderPath\n")
- output.write(s"Comet folder: $cometFolderPath\n")
- output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n")
-
- // Compare each subfolder
- sparkSubfolders.foreach { subfolderName =>
- val sparkSubfolderPath = new File(sparkFolder, subfolderName)
- val cometSubfolderPath = new File(cometFolder, subfolderName)
-
- if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) {
- output.write(s"## Subfolder: $subfolderName\n")
- output.write(
- s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n")
- } else {
- output.write(s"## Comparing subfolder: $subfolderName\n\n")
-
- try {
- // Read Spark parquet files
- spark.conf.set("spark.comet.enabled", "false")
- val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath)
- val sparkRows = sparkDf.orderBy(sparkDf.columns.map(functions.col): _*).collect()
-
- // Read Comet parquet files
- val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath)
- val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect()
-
- // Compare the results
- if (QueryComparison.assertSameRows(sparkRows, cometRows, output, tolerance)) {
- output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
- } else {
- // Output schema if dataframes are not equal
- QueryComparison.showSchema(
- output,
- sparkDf.schema.treeString,
- cometDf.schema.treeString)
- }
- } catch {
- case e: Exception =>
- output.write(
- s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n")
- val sw = new java.io.StringWriter()
- val p = new java.io.PrintWriter(sw)
- e.printStackTrace(p)
- p.close()
- output.write(s"```\n${sw.toString}\n```\n\n")
- }
- }
-
- output.flush()
- }
-
- output.write("\n# Comparison Complete\n")
- output.write(s"Compared ${sparkSubfolders.length} subfolders\n")
-
- } finally {
- output.close()
- }
- }
-}
diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala
deleted file mode 100644
index ced2ad28c4..0000000000
--- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.fuzz
-
-import scala.util.Random
-
-import org.rogach.scallop.{ScallopConf, Subcommand}
-import org.rogach.scallop.ScallopOption
-
-import org.apache.spark.sql.SparkSession
-
-import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions}
-
-class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
- object generateData extends Subcommand("data") {
- val numFiles: ScallopOption[Int] =
- opt[Int](required = true, descr = "Number of files to generate")
- val numRows: ScallopOption[Int] = opt[Int](required = true, descr = "Number of rows per file")
- val randomSeed: ScallopOption[Long] =
- opt[Long](required = false, descr = "Random seed to use")
- val generateArrays: ScallopOption[Boolean] =
- opt[Boolean](required = false, descr = "Whether to generate arrays")
- val generateStructs: ScallopOption[Boolean] =
- opt[Boolean](required = false, descr = "Whether to generate structs")
- val generateMaps: ScallopOption[Boolean] =
- opt[Boolean](required = false, descr = "Whether to generate maps")
- val excludeNegativeZero: ScallopOption[Boolean] =
- opt[Boolean](required = false, descr = "Whether to exclude negative zero")
- }
- addSubcommand(generateData)
- object generateQueries extends Subcommand("queries") {
- val numFiles: ScallopOption[Int] =
- opt[Int](required = true, descr = "Number of input files to use")
- val numQueries: ScallopOption[Int] =
- opt[Int](required = true, descr = "Number of queries to generate")
- val randomSeed: ScallopOption[Long] =
- opt[Long](required = false, descr = "Random seed to use")
- }
- addSubcommand(generateQueries)
- object runQueries extends Subcommand("run") {
- val filename: ScallopOption[String] =
- opt[String](required = true, descr = "File to write queries to")
- val numFiles: ScallopOption[Int] =
- opt[Int](required = true, descr = "Number of input files to use")
- val showFailedSparkQueries: ScallopOption[Boolean] =
- opt[Boolean](required = false, descr = "Whether to show failed Spark queries")
- }
- addSubcommand(runQueries)
- verify()
-}
-
-object Main {
-
- lazy val spark: SparkSession = SparkSession
- .builder()
- .getOrCreate()
-
- def main(args: Array[String]): Unit = {
- val conf = new Conf(args.toIndexedSeq)
- conf.subcommand match {
- case Some(conf.generateData) =>
- val r = conf.generateData.randomSeed.toOption match {
- case Some(seed) => new Random(seed)
- case None => new Random()
- }
- for (i <- 0 until conf.generateData.numFiles()) {
- ParquetGenerator.makeParquetFile(
- r,
- spark,
- s"test$i.parquet",
- numRows = conf.generateData.numRows(),
- SchemaGenOptions(
- generateArray = conf.generateData.generateArrays(),
- generateStruct = conf.generateData.generateStructs(),
- generateMap = conf.generateData.generateMaps(),
- // create two columns of each primitive type so that they can be used in binary
- // expressions such as `a + b` and `a < b`
- primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes ++
- SchemaGenOptions.defaultPrimitiveTypes),
- DataGenOptions(
- allowNull = true,
- generateNegativeZero = !conf.generateData.excludeNegativeZero()))
- }
- case Some(conf.generateQueries) =>
- val r = conf.generateQueries.randomSeed.toOption match {
- case Some(seed) => new Random(seed)
- case None => new Random()
- }
- QueryGen.generateRandomQueries(
- r,
- spark,
- numFiles = conf.generateQueries.numFiles(),
- conf.generateQueries.numQueries())
- case Some(conf.runQueries) =>
- QueryRunner.runQueries(
- spark,
- conf.runQueries.numFiles(),
- conf.runQueries.filename(),
- conf.runQueries.showFailedSparkQueries())
- case _ =>
- // scalastyle:off println
- println("Invalid subcommand")
- // scalastyle:on println
- sys.exit(-1)
- }
- }
-}
diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala
deleted file mode 100644
index d390e86aee..0000000000
--- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala
+++ /dev/null
@@ -1,390 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.fuzz
-
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.types.DataTypes
-
-sealed trait SparkType
-case class SparkTypeOneOf(dataTypes: Seq[SparkType]) extends SparkType
-case object SparkBooleanType extends SparkType
-case object SparkBinaryType extends SparkType
-case object SparkStringType extends SparkType
-case object SparkIntegralType extends SparkType
-case object SparkByteType extends SparkType
-case object SparkShortType extends SparkType
-case object SparkIntType extends SparkType
-case object SparkLongType extends SparkType
-case object SparkFloatType extends SparkType
-case object SparkDoubleType extends SparkType
-case class SparkDecimalType(p: Int, s: Int) extends SparkType
-case object SparkNumericType extends SparkType
-case object SparkDateType extends SparkType
-case object SparkTimestampType extends SparkType
-case object SparkDateOrTimestampType extends SparkType
-case class SparkArrayType(elementType: SparkType) extends SparkType
-case class SparkMapType(keyType: SparkType, valueType: SparkType) extends SparkType
-case class SparkStructType(fields: Seq[SparkType]) extends SparkType
-case object SparkAnyType extends SparkType
-
-case class FunctionSignature(inputTypes: Seq[SparkType], varArgs: Boolean = false)
-
-case class Function(name: String, signatures: Seq[FunctionSignature])
-
-object Meta {
-
- val primitiveSparkTypes: Seq[SparkType] = Seq(
- SparkBooleanType,
- SparkBinaryType,
- SparkStringType,
- SparkByteType,
- SparkShortType,
- SparkIntType,
- SparkLongType,
- SparkFloatType,
- SparkDoubleType,
- SparkDateType,
- SparkTimestampType)
-
- val dataTypes: Seq[(DataType, Double)] = Seq(
- (DataTypes.BooleanType, 0.1),
- (DataTypes.ByteType, 0.2),
- (DataTypes.ShortType, 0.2),
- (DataTypes.IntegerType, 0.2),
- (DataTypes.LongType, 0.2),
- (DataTypes.FloatType, 0.2),
- (DataTypes.DoubleType, 0.2),
- (DataTypes.createDecimalType(10, 2), 0.2),
- (DataTypes.DateType, 0.2),
- (DataTypes.TimestampType, 0.2),
- (DataTypes.TimestampNTZType, 0.2),
- (DataTypes.StringType, 0.2),
- (DataTypes.BinaryType, 0.1))
-
- private def createFunctionWithInputTypes(
- name: String,
- inputs: Seq[SparkType],
- varArgs: Boolean = false): Function = {
- Function(name, Seq(FunctionSignature(inputs, varArgs)))
- createFunctions(name, Seq(FunctionSignature(inputs, varArgs)))
- }
-
- private def createFunctions(name: String, signatures: Seq[FunctionSignature]): Function = {
- signatures.foreach { s =>
- assert(
- !s.varArgs || s.inputTypes.length == 1,
- s"Variadic function $s must have exactly one input type")
- }
- Function(name, signatures)
- }
-
- private def createUnaryStringFunction(name: String): Function = {
- createFunctionWithInputTypes(name, Seq(SparkStringType))
- }
-
- private def createUnaryNumericFunction(name: String): Function = {
- createFunctionWithInputTypes(name, Seq(SparkNumericType))
- }
-
- // Math expressions (corresponds to mathExpressions in QueryPlanSerde)
- val mathScalarFunc: Seq[Function] = Seq(
- createUnaryNumericFunction("abs"),
- createUnaryNumericFunction("acos"),
- createUnaryNumericFunction("asin"),
- createUnaryNumericFunction("atan"),
- createFunctionWithInputTypes("atan2", Seq(SparkNumericType, SparkNumericType)),
- createUnaryNumericFunction("cos"),
- createUnaryNumericFunction("cosh"),
- createUnaryNumericFunction("exp"),
- createUnaryNumericFunction("expm1"),
- createFunctionWithInputTypes("log", Seq(SparkNumericType, SparkNumericType)),
- createUnaryNumericFunction("log10"),
- createUnaryNumericFunction("log2"),
- createFunctionWithInputTypes("pow", Seq(SparkNumericType, SparkNumericType)),
- createFunctionWithInputTypes("remainder", Seq(SparkNumericType, SparkNumericType)),
- createFunctions(
- "round",
- Seq(
- FunctionSignature(Seq(SparkNumericType)),
- FunctionSignature(Seq(SparkNumericType, SparkIntType)))),
- createUnaryNumericFunction("signum"),
- createUnaryNumericFunction("sin"),
- createUnaryNumericFunction("sinh"),
- createUnaryNumericFunction("sqrt"),
- createUnaryNumericFunction("tan"),
- createUnaryNumericFunction("tanh"),
- createUnaryNumericFunction("cot"),
- createUnaryNumericFunction("ceil"),
- createUnaryNumericFunction("floor"),
- createFunctionWithInputTypes("unary_minus", Seq(SparkNumericType)))
-
- // Hash expressions (corresponds to hashExpressions in QueryPlanSerde)
- val hashScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes("md5", Seq(SparkAnyType)),
- createFunctionWithInputTypes("murmur3_hash", Seq(SparkAnyType)), // TODO variadic
- createFunctionWithInputTypes("sha2", Seq(SparkAnyType, SparkIntType)))
-
- // String expressions (corresponds to stringExpressions in QueryPlanSerde)
- val stringScalarFunc: Seq[Function] = Seq(
- createUnaryStringFunction("ascii"),
- createUnaryStringFunction("bit_length"),
- createUnaryStringFunction("chr"),
- createFunctionWithInputTypes(
- "concat",
- Seq(
- SparkTypeOneOf(
- Seq(
- SparkStringType,
- SparkBinaryType,
- SparkArrayType(SparkStringType),
- SparkArrayType(SparkNumericType),
- SparkArrayType(SparkBinaryType)))),
- varArgs = true),
- createFunctionWithInputTypes("concat_ws", Seq(SparkStringType, SparkStringType)),
- createFunctionWithInputTypes("contains", Seq(SparkStringType, SparkStringType)),
- createFunctionWithInputTypes("ends_with", Seq(SparkStringType, SparkStringType)),
- createFunctionWithInputTypes(
- "hex",
- Seq(SparkTypeOneOf(Seq(SparkStringType, SparkBinaryType, SparkIntType, SparkLongType)))),
- createUnaryStringFunction("init_cap"),
- createFunctionWithInputTypes("instr", Seq(SparkStringType, SparkStringType)),
- createFunctionWithInputTypes(
- "length",
- Seq(SparkTypeOneOf(Seq(SparkStringType, SparkBinaryType)))),
- createFunctionWithInputTypes("like", Seq(SparkStringType, SparkStringType)),
- createUnaryStringFunction("lower"),
- createFunctions(
- "lpad",
- Seq(
- FunctionSignature(Seq(SparkStringType, SparkIntegralType)),
- FunctionSignature(Seq(SparkStringType, SparkIntegralType, SparkStringType)))),
- createUnaryStringFunction("ltrim"),
- createUnaryStringFunction("octet_length"),
- createFunctions(
- "regexp_replace",
- Seq(
- FunctionSignature(Seq(SparkStringType, SparkStringType, SparkStringType)),
- FunctionSignature(Seq(SparkStringType, SparkStringType, SparkStringType, SparkIntType)))),
- createFunctionWithInputTypes("repeat", Seq(SparkStringType, SparkIntType)),
- createFunctions(
- "replace",
- Seq(
- FunctionSignature(Seq(SparkStringType, SparkStringType)),
- FunctionSignature(Seq(SparkStringType, SparkStringType, SparkStringType)))),
- createFunctions(
- "reverse",
- Seq(
- FunctionSignature(Seq(SparkStringType)),
- FunctionSignature(Seq(SparkArrayType(SparkAnyType))))),
- createFunctionWithInputTypes("rlike", Seq(SparkStringType, SparkStringType)),
- createFunctions(
- "rpad",
- Seq(
- FunctionSignature(Seq(SparkStringType, SparkIntegralType)),
- FunctionSignature(Seq(SparkStringType, SparkIntegralType, SparkStringType)))),
- createUnaryStringFunction("rtrim"),
- createFunctions(
- "split",
- Seq(
- FunctionSignature(Seq(SparkStringType, SparkStringType)),
- FunctionSignature(Seq(SparkStringType, SparkStringType, SparkIntType)))),
- createFunctionWithInputTypes("starts_with", Seq(SparkStringType, SparkStringType)),
- createFunctionWithInputTypes("string_space", Seq(SparkIntType)),
- createFunctionWithInputTypes("substring", Seq(SparkStringType, SparkIntType, SparkIntType)),
- createFunctionWithInputTypes("translate", Seq(SparkStringType, SparkStringType)),
- createUnaryStringFunction("trim"),
- createUnaryStringFunction("btrim"),
- createUnaryStringFunction("unhex"),
- createUnaryStringFunction("upper"),
- createFunctionWithInputTypes("xxhash64", Seq(SparkAnyType)), // TODO variadic
- createFunctionWithInputTypes("sha1", Seq(SparkAnyType)))
-
- // Conditional expressions (corresponds to conditionalExpressions in QueryPlanSerde)
- val conditionalScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes("if", Seq(SparkBooleanType, SparkAnyType, SparkAnyType)))
-
- // Map expressions (corresponds to mapExpressions in QueryPlanSerde)
- val mapScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes(
- "map_extract",
- Seq(SparkMapType(SparkAnyType, SparkAnyType), SparkAnyType)),
- createFunctionWithInputTypes("map_keys", Seq(SparkMapType(SparkAnyType, SparkAnyType))),
- createFunctionWithInputTypes("map_entries", Seq(SparkMapType(SparkAnyType, SparkAnyType))),
- createFunctionWithInputTypes("map_values", Seq(SparkMapType(SparkAnyType, SparkAnyType))),
- createFunctionWithInputTypes(
- "map_from_arrays",
- Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes(
- "map_from_entries",
- Seq(
- SparkArrayType(
- SparkStructType(Seq(
- SparkTypeOneOf(primitiveSparkTypes.filterNot(_ == SparkBinaryType)),
- SparkTypeOneOf(primitiveSparkTypes.filterNot(_ == SparkBinaryType))))))))
-
- // Predicate expressions (corresponds to predicateExpressions in QueryPlanSerde)
- val predicateScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes("and", Seq(SparkBooleanType, SparkBooleanType)),
- createFunctionWithInputTypes("or", Seq(SparkBooleanType, SparkBooleanType)),
- createFunctionWithInputTypes("not", Seq(SparkBooleanType)),
- createFunctionWithInputTypes("in", Seq(SparkAnyType, SparkAnyType))
- ) // TODO: variadic
-
- // Struct expressions (corresponds to structExpressions in QueryPlanSerde)
- val structScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes(
- "create_named_struct",
- Seq(SparkStringType, SparkAnyType)
- ), // TODO: variadic name/value pairs
- createFunctionWithInputTypes(
- "get_struct_field",
- Seq(SparkStructType(Seq(SparkAnyType)), SparkStringType)))
-
- // Bitwise expressions (corresponds to bitwiseExpressions in QueryPlanSerde)
- val bitwiseScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes("bitwise_and", Seq(SparkIntegralType, SparkIntegralType)),
- createFunctionWithInputTypes("bitwise_count", Seq(SparkIntegralType)),
- createFunctionWithInputTypes("bitwise_get", Seq(SparkIntegralType, SparkIntType)),
- createFunctionWithInputTypes("bitwise_or", Seq(SparkIntegralType, SparkIntegralType)),
- createFunctionWithInputTypes("bitwise_not", Seq(SparkIntegralType)),
- createFunctionWithInputTypes("bitwise_xor", Seq(SparkIntegralType, SparkIntegralType)),
- createFunctionWithInputTypes("shift_left", Seq(SparkIntegralType, SparkIntType)),
- createFunctionWithInputTypes("shift_right", Seq(SparkIntegralType, SparkIntType)))
-
- // Misc expressions (corresponds to miscExpressions in QueryPlanSerde)
- val miscScalarFunc: Seq[Function] =
- Seq(
- createFunctionWithInputTypes("isnan", Seq(SparkNumericType)),
- createFunctionWithInputTypes("isnull", Seq(SparkAnyType)),
- createFunctionWithInputTypes("isnotnull", Seq(SparkAnyType)),
- createFunctionWithInputTypes("coalesce", Seq(SparkAnyType, SparkAnyType))
- ) // TODO: variadic
-
- // Array expressions (corresponds to arrayExpressions in QueryPlanSerde)
- val arrayScalarFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes("array_append", Seq(SparkArrayType(SparkAnyType), SparkAnyType)),
- createFunctionWithInputTypes("array_compact", Seq(SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes(
- "array_contains",
- Seq(SparkArrayType(SparkAnyType), SparkAnyType)),
- createFunctionWithInputTypes("array_distinct", Seq(SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes(
- "array_except",
- Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes(
- "array_insert",
- Seq(SparkArrayType(SparkAnyType), SparkIntType, SparkAnyType)),
- createFunctionWithInputTypes(
- "array_intersect",
- Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))),
- createFunctions(
- "array_join",
- Seq(
- FunctionSignature(Seq(SparkArrayType(SparkAnyType), SparkStringType)),
- FunctionSignature(Seq(SparkArrayType(SparkAnyType), SparkStringType, SparkStringType)))),
- createFunctionWithInputTypes("array_max", Seq(SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes("array_min", Seq(SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes("array_remove", Seq(SparkArrayType(SparkAnyType), SparkAnyType)),
- createFunctionWithInputTypes("array_repeat", Seq(SparkAnyType, SparkIntType)),
- createFunctionWithInputTypes(
- "arrays_overlap",
- Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes(
- "array_union",
- Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))),
- createFunctionWithInputTypes("array", Seq(SparkAnyType, SparkAnyType)), // TODO: variadic
- createFunctionWithInputTypes(
- "element_at",
- Seq(
- SparkTypeOneOf(
- Seq(SparkArrayType(SparkAnyType), SparkMapType(SparkAnyType, SparkAnyType))),
- SparkAnyType)),
- createFunctionWithInputTypes("flatten", Seq(SparkArrayType(SparkArrayType(SparkAnyType)))),
- createFunctionWithInputTypes(
- "get_array_item",
- Seq(SparkArrayType(SparkAnyType), SparkIntType)))
-
- // Temporal expressions (corresponds to temporalExpressions in QueryPlanSerde)
- val temporalScalarFunc: Seq[Function] =
- Seq(
- createFunctionWithInputTypes("date_add", Seq(SparkDateType, SparkIntType)),
- createFunctionWithInputTypes("date_sub", Seq(SparkDateType, SparkIntType)),
- createFunctions(
- "from_unixtime",
- Seq(
- FunctionSignature(Seq(SparkLongType)),
- FunctionSignature(Seq(SparkLongType, SparkStringType)))),
- createFunctionWithInputTypes("hour", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("minute", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("second", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("trunc", Seq(SparkDateOrTimestampType, SparkStringType)),
- createFunctionWithInputTypes("year", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("month", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("day", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("dayofmonth", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("dayofweek", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("weekday", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("dayofyear", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("weekofyear", Seq(SparkDateOrTimestampType)),
- createFunctionWithInputTypes("quarter", Seq(SparkDateOrTimestampType)))
-
- // Combined in same order as exprSerdeMap in QueryPlanSerde
- val scalarFunc: Seq[Function] = mathScalarFunc ++ hashScalarFunc ++ stringScalarFunc ++
- conditionalScalarFunc ++ mapScalarFunc ++ predicateScalarFunc ++
- structScalarFunc ++ bitwiseScalarFunc ++ miscScalarFunc ++ arrayScalarFunc ++
- temporalScalarFunc
-
- val aggFunc: Seq[Function] = Seq(
- createFunctionWithInputTypes("min", Seq(SparkAnyType)),
- createFunctionWithInputTypes("max", Seq(SparkAnyType)),
- createFunctionWithInputTypes("count", Seq(SparkAnyType)),
- createUnaryNumericFunction("avg"),
- createUnaryNumericFunction("sum"),
- // first/last are non-deterministic and known to be incompatible with Spark
-// createFunctionWithInputTypes("first", Seq(SparkAnyType)),
-// createFunctionWithInputTypes("last", Seq(SparkAnyType)),
- createUnaryNumericFunction("var_pop"),
- createUnaryNumericFunction("var_samp"),
- createFunctionWithInputTypes("covar_pop", Seq(SparkNumericType, SparkNumericType)),
- createFunctionWithInputTypes("covar_samp", Seq(SparkNumericType, SparkNumericType)),
- createUnaryNumericFunction("stddev_pop"),
- createUnaryNumericFunction("stddev_samp"),
- createFunctionWithInputTypes("corr", Seq(SparkNumericType, SparkNumericType)),
- createFunctionWithInputTypes("bit_and", Seq(SparkIntegralType)),
- createFunctionWithInputTypes("bit_or", Seq(SparkIntegralType)),
- createFunctionWithInputTypes("bit_xor", Seq(SparkIntegralType)))
-
- val unaryArithmeticOps: Seq[String] = Seq("+", "-")
-
- val binaryArithmeticOps: Seq[String] =
- Seq("+", "-", "*", "/", "%", "&", "|", "^", "<<", ">>", "div")
-
- val comparisonOps: Seq[String] = Seq("=", "<=>", ">", ">=", "<", "<=")
-
- // TODO make this more comprehensive
- val comparisonTypes: Seq[SparkType] = Seq(
- SparkStringType,
- SparkBinaryType,
- SparkNumericType,
- SparkDateType,
- SparkTimestampType,
- SparkArrayType(SparkTypeOneOf(Seq(SparkStringType, SparkNumericType, SparkDateType))))
-
-}
diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala
deleted file mode 100644
index 6d404e6548..0000000000
--- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.fuzz
-
-import java.io.{BufferedWriter, FileWriter}
-
-import scala.annotation.tailrec
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.types._
-
-object QueryGen {
-
- def generateRandomQueries(
- r: Random,
- spark: SparkSession,
- numFiles: Int,
- numQueries: Int): Unit = {
- for (i <- 0 until numFiles) {
- spark.read.parquet(s"test$i.parquet").createTempView(s"test$i")
- }
-
- val w = new BufferedWriter(new FileWriter("queries.sql"))
-
- val uniqueQueries = mutable.HashSet[String]()
-
- for (_ <- 0 until numQueries) {
- try {
- val sql = r.nextInt().abs % 8 match {
- case 0 => generateJoin(r, spark, numFiles)
- case 1 => generateAggregate(r, spark, numFiles)
- case 2 => generateScalar(r, spark, numFiles)
- case 3 => generateCast(r, spark, numFiles)
- case 4 => generateUnaryArithmetic(r, spark, numFiles)
- case 5 => generateBinaryArithmetic(r, spark, numFiles)
- case 6 => generateBinaryComparison(r, spark, numFiles)
- case _ => generateConditional(r, spark, numFiles)
- }
- if (!uniqueQueries.contains(sql)) {
- uniqueQueries += sql
- w.write(sql + "\n")
- }
- } catch {
- case e: Exception =>
- // scalastyle:off
- println(s"Failed to generate query: ${e.getMessage}")
- }
- }
- w.close()
- }
-
- private def generateAggregate(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val func = Utils.randomChoice(Meta.aggFunc, r)
- try {
- val signature = Utils.randomChoice(func.signatures, r)
- val args = signature.inputTypes.map(x => pickRandomColumn(r, table, x))
-
- val groupingCols = Range(0, 2).map(_ => Utils.randomChoice(table.columns, r))
-
- if (groupingCols.isEmpty) {
- s"SELECT ${args.mkString(", ")}, ${func.name}(${args.mkString(", ")}) AS x " +
- s"FROM $tableName " +
- s"ORDER BY ${args.mkString(", ")};"
- } else {
- s"SELECT ${groupingCols.mkString(", ")}, ${func.name}(${args.mkString(", ")}) " +
- s"FROM $tableName " +
- s"GROUP BY ${groupingCols.mkString(",")} " +
- s"ORDER BY ${groupingCols.mkString(", ")};"
- }
- } catch {
- case e: Exception =>
- throw new IllegalStateException(
- s"Failed to generate SQL for aggregate function ${func.name}",
- e)
- }
- }
-
- private def generateScalar(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val func = Utils.randomChoice(Meta.scalarFunc, r)
- try {
- val signature = Utils.randomChoice(func.signatures, r)
- val args =
- if (signature.varArgs) {
- pickRandomColumns(r, table, signature.inputTypes.head)
- } else {
- signature.inputTypes.map(x => pickRandomColumn(r, table, x))
- }
-
- // Example SELECT c0, log(c0) as x FROM test0
- s"SELECT ${args.mkString(", ")}, ${func.name}(${args.mkString(", ")}) AS x " +
- s"FROM $tableName " +
- s"ORDER BY ${args.mkString(", ")};"
- } catch {
- case e: Exception =>
- throw new IllegalStateException(
- s"Failed to generate SQL for scalar function ${func.name}",
- e)
- }
- }
-
- @tailrec
- private def pickRandomColumns(r: Random, df: DataFrame, targetType: SparkType): Seq[String] = {
- targetType match {
- case SparkTypeOneOf(choices) =>
- val chosenType = Utils.randomChoice(choices, r)
- pickRandomColumns(r, df, chosenType)
- case _ =>
- var columns = Set.empty[String]
- for (_ <- 0 to r.nextInt(df.columns.length)) {
- columns += pickRandomColumn(r, df, targetType)
- }
- columns.toSeq
- }
- }
-
- private def pickRandomColumn(r: Random, df: DataFrame, targetType: SparkType): String = {
- targetType match {
- case SparkAnyType =>
- Utils.randomChoice(df.schema.fields, r).name
- case SparkBooleanType =>
- select(r, df, _.dataType == BooleanType)
- case SparkByteType =>
- select(r, df, _.dataType == ByteType)
- case SparkShortType =>
- select(r, df, _.dataType == ShortType)
- case SparkIntType =>
- select(r, df, _.dataType == IntegerType)
- case SparkLongType =>
- select(r, df, _.dataType == LongType)
- case SparkFloatType =>
- select(r, df, _.dataType == FloatType)
- case SparkDoubleType =>
- select(r, df, _.dataType == DoubleType)
- case SparkDecimalType(_, _) =>
- select(r, df, _.dataType.isInstanceOf[DecimalType])
- case SparkIntegralType =>
- select(
- r,
- df,
- f =>
- f.dataType == ByteType || f.dataType == ShortType ||
- f.dataType == IntegerType || f.dataType == LongType)
- case SparkNumericType =>
- select(r, df, f => isNumeric(f.dataType))
- case SparkStringType =>
- select(r, df, _.dataType == StringType)
- case SparkBinaryType =>
- select(r, df, _.dataType == BinaryType)
- case SparkDateType =>
- select(r, df, _.dataType == DateType)
- case SparkTimestampType =>
- select(r, df, _.dataType == TimestampType)
- case SparkDateOrTimestampType =>
- select(r, df, f => f.dataType == DateType || f.dataType == TimestampType)
- case SparkTypeOneOf(choices) =>
- pickRandomColumn(r, df, Utils.randomChoice(choices, r))
- case SparkArrayType(elementType) =>
- select(
- r,
- df,
- _.dataType match {
- case ArrayType(x, _) if typeMatch(elementType, x) => true
- case _ => false
- })
- case SparkMapType(keyType, valueType) =>
- select(
- r,
- df,
- _.dataType match {
- case MapType(k, v, _) if typeMatch(keyType, k) && typeMatch(valueType, v) => true
- case _ => false
- })
- case SparkStructType(fields) =>
- select(
- r,
- df,
- _.dataType match {
- case StructType(structFields) if structFields.length == fields.length => true
- case _ => false
- })
- case _ =>
- throw new IllegalStateException(targetType.toString)
- }
- }
-
- def pickTwoRandomColumns(r: Random, df: DataFrame, targetType: SparkType): (String, String) = {
- val a = pickRandomColumn(r, df, targetType)
- val df2 = df.drop(a)
- val b = pickRandomColumn(r, df2, targetType)
- (a, b)
- }
-
- /** Select a random field that matches a predicate */
- private def select(r: Random, df: DataFrame, predicate: StructField => Boolean): String = {
- val candidates = df.schema.fields.filter(predicate)
- if (candidates.isEmpty) {
- throw new IllegalStateException("Failed to find suitable column")
- }
- Utils.randomChoice(candidates, r).name
- }
-
- private def isNumeric(d: DataType): Boolean = {
- d match {
- case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
- _: DoubleType | _: DecimalType =>
- true
- case _ => false
- }
- }
-
- private def typeMatch(s: SparkType, d: DataType): Boolean = {
- (s, d) match {
- case (SparkAnyType, _) => true
- case (SparkBooleanType, BooleanType) => true
- case (SparkByteType, ByteType) => true
- case (SparkShortType, ShortType) => true
- case (SparkIntType, IntegerType) => true
- case (SparkLongType, LongType) => true
- case (SparkFloatType, FloatType) => true
- case (SparkDoubleType, DoubleType) => true
- case (SparkDecimalType(_, _), _: DecimalType) => true
- case (SparkIntegralType, ByteType | ShortType | IntegerType | LongType) => true
- case (SparkNumericType, _) if isNumeric(d) => true
- case (SparkStringType, StringType) => true
- case (SparkBinaryType, BinaryType) => true
- case (SparkDateType, DateType) => true
- case (SparkTimestampType, TimestampType | TimestampNTZType) => true
- case (SparkDateOrTimestampType, DateType | TimestampType | TimestampNTZType) => true
- case (SparkArrayType(elementType), ArrayType(elementDataType, _)) =>
- typeMatch(elementType, elementDataType)
- case (SparkMapType(keyType, valueType), MapType(keyDataType, valueDataType, _)) =>
- typeMatch(keyType, keyDataType) && typeMatch(valueType, valueDataType)
- case (SparkStructType(fields), StructType(structFields)) =>
- fields.length == structFields.length &&
- fields.zip(structFields.map(_.dataType)).forall { case (sparkType, dataType) =>
- typeMatch(sparkType, dataType)
- }
- case (SparkTypeOneOf(choices), _) =>
- choices.exists(choice => typeMatch(choice, d))
- case _ => false
- }
- }
-
- private def generateUnaryArithmetic(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val op = Utils.randomChoice(Meta.unaryArithmeticOps, r)
- val a = pickRandomColumn(r, table, SparkNumericType)
-
- // Example SELECT a, -a FROM test0
- s"SELECT $a, $op$a " +
- s"FROM $tableName " +
- s"ORDER BY $a;"
- }
-
- private def generateBinaryArithmetic(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val op = Utils.randomChoice(Meta.binaryArithmeticOps, r)
- val (a, b) = pickTwoRandomColumns(r, table, SparkNumericType)
-
- // Example SELECT a, b, a+b FROM test0
- s"SELECT $a, $b, $a $op $b " +
- s"FROM $tableName " +
- s"ORDER BY $a, $b;"
- }
-
- private def generateBinaryComparison(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val op = Utils.randomChoice(Meta.comparisonOps, r)
-
- // pick two columns with the same type
- val opType = Utils.randomChoice(Meta.comparisonTypes, r)
- val (a, b) = pickTwoRandomColumns(r, table, opType)
-
- // Example SELECT a, b, a <=> b FROM test0
- s"SELECT $a, $b, $a $op $b " +
- s"FROM $tableName " +
- s"ORDER BY $a, $b;"
- }
-
- private def generateConditional(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val op = Utils.randomChoice(Meta.comparisonOps, r)
-
- // pick two columns with the same type
- val opType = Utils.randomChoice(Meta.comparisonTypes, r)
- val (a, b) = pickTwoRandomColumns(r, table, opType)
-
- // Example SELECT a, b, IF(a <=> b, 1, 2), CASE WHEN a <=> b THEN 1 ELSE 2 END FROM test0
- s"SELECT $a, $b, $a $op $b, IF($a $op $b, 1, 2), CASE WHEN $a $op $b THEN 1 ELSE 2 END " +
- s"FROM $tableName " +
- s"ORDER BY $a, $b;"
- }
-
- private def generateCast(r: Random, spark: SparkSession, numFiles: Int): String = {
- val tableName = s"test${r.nextInt(numFiles)}"
- val table = spark.table(tableName)
-
- val toType = Utils.randomWeightedChoice(Meta.dataTypes, r).sql
- val arg = Utils.randomChoice(table.columns, r)
-
- // We test both `cast` and `try_cast` to cover LEGACY and TRY eval modes. It is not
- // recommended to run Comet Fuzz with ANSI enabled currently.
- // Example SELECT c0, cast(c0 as float), try_cast(c0 as float) FROM test0
- s"SELECT $arg, cast($arg as $toType), try_cast($arg as $toType) " +
- s"FROM $tableName " +
- s"ORDER BY $arg;"
- }
-
- private def generateJoin(r: Random, spark: SparkSession, numFiles: Int): String = {
- val leftTableName = s"test${r.nextInt(numFiles)}"
- val rightTableName = s"test${r.nextInt(numFiles)}"
- val leftTable = spark.table(leftTableName)
- val rightTable = spark.table(rightTableName)
-
- val leftCol = Utils.randomChoice(leftTable.columns, r)
- val rightCol = Utils.randomChoice(rightTable.columns, r)
-
- val joinTypes = Seq(("INNER", 0.4), ("LEFT", 0.3), ("RIGHT", 0.3))
- val joinType = Utils.randomWeightedChoice(joinTypes, r)
-
- val leftColProjection = leftTable.columns.map(c => s"l.$c").mkString(", ")
- val rightColProjection = rightTable.columns.map(c => s"r.$c").mkString(", ")
- "SELECT " +
- s"$leftColProjection, " +
- s"$rightColProjection " +
- s"FROM $leftTableName l " +
- s"$joinType JOIN $rightTableName r " +
- s"ON l.$leftCol = r.$rightCol " +
- "ORDER BY " +
- s"$leftColProjection, " +
- s"$rightColProjection;"
- }
-
-}
diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala
deleted file mode 100644
index dc7189c533..0000000000
--- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.fuzz
-
-import java.io.{BufferedWriter, FileWriter, PrintWriter, StringWriter}
-
-import scala.collection.mutable
-import scala.io.Source
-
-import org.apache.spark.sql.{Row, SparkSession}
-
-import org.apache.comet.fuzz.QueryComparison.showPlans
-
-object QueryRunner {
-
- def createOutputMdFile(): BufferedWriter = {
- val outputFilename = s"results-${System.currentTimeMillis()}.md"
- // scalastyle:off println
- println(s"Writing results to $outputFilename")
- // scalastyle:on println
-
- new BufferedWriter(new FileWriter(outputFilename))
- }
-
- def runQueries(
- spark: SparkSession,
- numFiles: Int,
- filename: String,
- showFailedSparkQueries: Boolean = false): Unit = {
-
- var queryCount = 0
- var invalidQueryCount = 0
- var cometFailureCount = 0
- var cometSuccessCount = 0
-
- val w = createOutputMdFile()
-
- // register input files
- for (i <- 0 until numFiles) {
- val table = spark.read.parquet(s"test$i.parquet")
- val tableName = s"test$i"
- table.createTempView(tableName)
- w.write(
- s"Created table $tableName with schema:\n\t" +
- s"${table.schema.fields.map(f => s"${f.name}: ${f.dataType}").mkString("\n\t")}\n\n")
- }
-
- val querySource = Source.fromFile(filename)
- try {
- querySource
- .getLines()
- .foreach(sql => {
- queryCount += 1
- try {
- // execute with Spark
- spark.conf.set("spark.comet.enabled", "false")
- val df = spark.sql(sql)
- val sparkRows = df.collect()
- val sparkPlan = df.queryExecution.executedPlan.toString
-
- // execute with Comet
- try {
- spark.conf.set("spark.comet.enabled", "true")
- val df = spark.sql(sql)
- val cometRows = df.collect()
- val cometPlan = df.queryExecution.executedPlan.toString
-
- var success = QueryComparison.assertSameRows(sparkRows, cometRows, output = w)
-
- // check that the plan contains Comet operators
- if (!cometPlan.contains("Comet")) {
- success = false
- w.write("[ERROR] Comet did not accelerate any part of the plan\n")
- }
-
- QueryComparison.showSQL(w, sql)
-
- if (success) {
- cometSuccessCount += 1
- } else {
- // show plans for failed queries
- showPlans(w, sparkPlan, cometPlan)
- cometFailureCount += 1
- }
-
- } catch {
- case e: Exception =>
- // the query worked in Spark but failed in Comet, so this is likely a bug in Comet
- cometFailureCount += 1
- QueryComparison.showSQL(w, sql)
- w.write("### Spark Plan\n")
- w.write(s"```\n$sparkPlan\n```\n")
-
- w.write(s"[ERROR] Query failed in Comet: ${e.getMessage}:\n")
- w.write("```\n")
- val sw = new StringWriter()
- val p = new PrintWriter(sw)
- e.printStackTrace(p)
- p.close()
- w.write(s"${sw.toString}\n")
- w.write("```\n")
- }
-
- // flush after every query so that results are saved in the event of the driver crashing
- w.flush()
-
- } catch {
- case e: Exception =>
- // we expect many generated queries to be invalid
- invalidQueryCount += 1
- if (showFailedSparkQueries) {
- QueryComparison.showSQL(w, sql)
- w.write(s"Query failed in Spark: ${e.getMessage}\n")
- }
- }
- })
-
- w.write("# Summary\n")
- w.write(
- s"Total queries: $queryCount; Invalid queries: $invalidQueryCount; " +
- s"Comet failed: $cometFailureCount; Comet succeeded: $cometSuccessCount\n")
-
- } finally {
- w.close()
- querySource.close()
- }
- }
-}
-
-object QueryComparison {
- def assertSameRows(
- sparkRows: Array[Row],
- cometRows: Array[Row],
- output: BufferedWriter,
- tolerance: Double = 0.000001): Boolean = {
- if (sparkRows.length == cometRows.length) {
- var i = 0
- while (i < sparkRows.length) {
- val l = sparkRows(i)
- val r = cometRows(i)
-
- // Check the schema is equal for first row only
- if (i == 0 && l.schema != r.schema) {
- output.write("[ERROR] Spark produced different schema than Comet.\n")
-
- return false
- }
-
- assert(l.length == r.length)
- for (j <- 0 until l.length) {
- if (!same(l(j), r(j), tolerance)) {
- output.write(s"First difference at row $i:\n")
- output.write("Spark: `" + formatRow(l) + "`\n")
- output.write("Comet: `" + formatRow(r) + "`\n")
- i = sparkRows.length
-
- return false
- }
- }
- i += 1
- }
- } else {
- output.write(
- s"[ERROR] Spark produced ${sparkRows.length} rows and " +
- s"Comet produced ${cometRows.length} rows.\n")
-
- return false
- }
-
- true
- }
-
- private def same(l: Any, r: Any, tolerance: Double): Boolean = {
- if (l == null || r == null) {
- return l == null && r == null
- }
- (l, r) match {
- case (a: Float, b: Float) if a.isPosInfinity => b.isPosInfinity
- case (a: Float, b: Float) if a.isNegInfinity => b.isNegInfinity
- case (a: Float, b: Float) if a.isInfinity => b.isInfinity
- case (a: Float, b: Float) if a.isNaN => b.isNaN
- case (a: Float, b: Float) => (a - b).abs <= tolerance
- case (a: Double, b: Double) if a.isPosInfinity => b.isPosInfinity
- case (a: Double, b: Double) if a.isNegInfinity => b.isNegInfinity
- case (a: Double, b: Double) if a.isInfinity => b.isInfinity
- case (a: Double, b: Double) if a.isNaN => b.isNaN
- case (a: Double, b: Double) => (a - b).abs <= tolerance
- case (a: Array[_], b: Array[_]) =>
- a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance))
- case (a: mutable.WrappedArray[_], b: mutable.WrappedArray[_]) =>
- a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance))
- case (a: Row, b: Row) =>
- val aa = a.toSeq
- val bb = b.toSeq
- aa.length == bb.length && aa.zip(bb).forall(x => same(x._1, x._2, tolerance))
- case (a, b) => a == b
- }
- }
-
- private def format(value: Any): String = {
- value match {
- case null => "NULL"
- case v: mutable.WrappedArray[_] => s"[${v.map(format).mkString(",")}]"
- case v: Array[Byte] => s"[${v.mkString(",")}]"
- case r: Row => formatRow(r)
- case other => other.toString
- }
- }
-
- private def formatRow(row: Row): String = {
- row.toSeq.map(format).mkString(",")
- }
-
- def showSQL(w: BufferedWriter, sql: String, maxLength: Int = 120): Unit = {
- w.write("## SQL\n")
- w.write("```\n")
- val words = sql.split(" ")
- val currentLine = new StringBuilder
- for (word <- words) {
- if (currentLine.length + word.length + 1 > maxLength) {
- w.write(currentLine.toString.trim)
- w.write("\n")
- currentLine.setLength(0)
- }
- currentLine.append(word).append(" ")
- }
- if (currentLine.nonEmpty) {
- w.write(currentLine.toString.trim)
- w.write("\n")
- }
- w.write("```\n")
- }
-
- def showPlans(w: BufferedWriter, sparkPlan: String, cometPlan: String): Unit = {
- w.write("### Spark Plan\n")
- w.write(s"```\n$sparkPlan\n```\n")
- w.write("### Comet Plan\n")
- w.write(s"```\n$cometPlan\n```\n")
- }
-
- def showSchema(w: BufferedWriter, sparkSchema: String, cometSchema: String): Unit = {
- w.write("### Spark Schema\n")
- w.write(s"```\n$sparkSchema\n```\n")
- w.write("### Comet Schema\n")
- w.write(s"```\n$cometSchema\n```\n")
- }
-}
diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala
deleted file mode 100644
index 4d51c60e54..0000000000
--- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.fuzz
-
-import scala.util.Random
-
-object Utils {
-
- def randomChoice[T](list: Seq[T], r: Random): T = {
- list(r.nextInt(list.length))
- }
-
- def randomWeightedChoice[T](valuesWithWeights: Seq[(T, Double)], r: Random): T = {
- val totalWeight = valuesWithWeights.map(_._2).sum
- val randomValue = r.nextDouble() * totalWeight
- var cumulativeWeight = 0.0
-
- for ((value, weight) <- valuesWithWeights) {
- cumulativeWeight += weight
- if (cumulativeWeight >= randomValue) {
- return value
- }
- }
-
- // If for some reason the loop doesn't return, return the last value
- valuesWithWeights.last._1
- }
-
-}
diff --git a/kube/Dockerfile b/kube/Dockerfile
index 699aeeb210..5c0e6d3b66 100644
--- a/kube/Dockerfile
+++ b/kube/Dockerfile
@@ -55,7 +55,6 @@ COPY mvnw /comet/mvnw
COPY common /comet/common
COPY dev /comet/dev
COPY docs /comet/docs
-COPY fuzz-testing /comet/fuzz-testing
COPY spark /comet/spark
COPY spark-integration /comet/spark-integration
COPY scalafmt.conf /comet/scalafmt.conf
diff --git a/pom.xml b/pom.xml
index d1c81b8352..22c97ea1eb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,7 +38,6 @@ under the License.
common
spark
spark-integration
- fuzz-testing