From 93121fb277888006b31e49ee01054405d4872aed Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 09:29:27 -0600 Subject: [PATCH] build: remove fuzz-testing maven module Remove the standalone fuzz-testing module along with its references in pom.xml, .dockerignore, kube/Dockerfile, and the spark_sql_test and iceberg_spark_test workflow paths-ignore lists. --- .dockerignore | 1 - .github/workflows/iceberg_spark_test.yml | 2 - .github/workflows/spark_sql_test.yml | 2 - fuzz-testing/.gitignore | 6 - fuzz-testing/README.md | 120 ------ fuzz-testing/pom.xml | 129 ------ fuzz-testing/run.sh | 87 ---- .../apache/comet/fuzz/ComparisonTool.scala | 153 ------- .../scala/org/apache/comet/fuzz/Main.scala | 124 ------ .../scala/org/apache/comet/fuzz/Meta.scala | 390 ------------------ .../org/apache/comet/fuzz/QueryGen.scala | 367 ---------------- .../org/apache/comet/fuzz/QueryRunner.scala | 264 ------------ .../scala/org/apache/comet/fuzz/Utils.scala | 46 --- kube/Dockerfile | 1 - pom.xml | 1 - 15 files changed, 1693 deletions(-) delete mode 100644 fuzz-testing/.gitignore delete mode 100644 fuzz-testing/README.md delete mode 100644 fuzz-testing/pom.xml delete mode 100755 fuzz-testing/run.sh delete mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala delete mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala delete mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala delete mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala delete mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala delete mode 100644 fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala diff --git a/.dockerignore b/.dockerignore index 332acff447..c5b0e9c12c 100644 --- a/.dockerignore +++ b/.dockerignore @@ -10,7 +10,6 @@ metastore_db target common/target spark-integration/target -fuzz-testing/target spark/target native/target core/target diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index c97445ea1d..edc6f5b46d 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -36,7 +36,6 @@ on: - "spark/src/test/**" - "common/src/test/**" - "spark/src/main/scala/org/apache/comet/GenerateDocs.scala" - - "fuzz-testing/**" - "spark-integration/**" pull_request: paths-ignore: @@ -50,7 +49,6 @@ on: - "spark/src/test/**" - "common/src/test/**" - "spark/src/main/scala/org/apache/comet/GenerateDocs.scala" - - "fuzz-testing/**" - "spark-integration/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow diff --git a/.github/workflows/spark_sql_test.yml b/.github/workflows/spark_sql_test.yml index 70f3fc92d5..5d23acb2f8 100644 --- a/.github/workflows/spark_sql_test.yml +++ b/.github/workflows/spark_sql_test.yml @@ -36,7 +36,6 @@ on: - "spark/src/test/**" - "common/src/test/**" - "spark/src/main/scala/org/apache/comet/GenerateDocs.scala" - - "fuzz-testing/**" - "spark-integration/**" pull_request: paths-ignore: @@ -50,7 +49,6 @@ on: - "spark/src/test/**" - "common/src/test/**" - "spark/src/main/scala/org/apache/comet/GenerateDocs.scala" - - "fuzz-testing/**" - "spark-integration/**" # manual trigger # https://docs.github.com/en/actions/managing-workflow-runs/manually-running-a-workflow diff --git a/fuzz-testing/.gitignore b/fuzz-testing/.gitignore deleted file mode 100644 index 570ff02a76..0000000000 --- a/fuzz-testing/.gitignore +++ /dev/null @@ -1,6 +0,0 @@ -.idea -target -spark-warehouse -queries.sql -results*.md -test*.parquet \ No newline at end of file diff --git a/fuzz-testing/README.md b/fuzz-testing/README.md deleted file mode 100644 index c9f4d881d4..0000000000 --- a/fuzz-testing/README.md +++ /dev/null @@ -1,120 +0,0 @@ - - -# Comet Fuzz - -Comet Fuzz is a standalone project for generating random data and queries and executing queries against Spark -with Comet disabled and enabled and checking for incompatibilities. - -Although it is a simple tool it has already been useful in finding many bugs. - -Comet Fuzz is inspired by the [SparkFuzz](https://ir.cwi.nl/pub/30222) paper from Databricks and CWI. - -## Roadmap - -Planned areas of improvement: - -- ANSI mode -- Support for all data types, expressions, and operators supported by Comet -- IF and CASE WHEN expressions -- Complex (nested) expressions -- Literal scalar values in queries -- Add option to avoid grouping and sorting on floating-point columns -- Improve join query support: - - Support joins without join keys - - Support composite join keys - - Support multiple join keys - - Support join conditions that use expressions - -## Usage - -From the root of the project, run `mvn install -DskipTests` to install Comet. - -Then build the fuzz testing jar. - -```shell -mvn package -``` - -Set appropriate values for `SPARK_HOME`, `SPARK_MASTER`, and `COMET_JAR` environment variables and then use -`spark-submit` to run CometFuzz against a Spark cluster. - -### Generating Data Files - -```shell -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --class org.apache.comet.fuzz.Main \ - target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \ - data --num-files=2 --num-rows=200 --exclude-negative-zero --generate-arrays --generate-structs --generate-maps -``` - -There is an optional `--exclude-negative-zero` flag for excluding `-0.0` from the generated data, which is -sometimes useful because we already know that we often have different behavior for this edge case due to -differences between Rust and Java handling of this value. - -### Generating Queries - -Generate random queries that are based on the available test files. - -```shell -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --class org.apache.comet.fuzz.Main \ - target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \ - queries --num-files=2 --num-queries=500 -``` - -Note that the output filename is currently hard-coded as `queries.sql` - -### Execute Queries - -```shell -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16G \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.comet.enabled=true \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.exec.shuffle.enabled=true \ - --jars $COMET_JAR \ - --conf spark.driver.extraClassPath=$COMET_JAR \ - --conf spark.executor.extraClassPath=$COMET_JAR \ - --class org.apache.comet.fuzz.Main \ - target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \ - run --num-files=2 --filename=queries.sql -``` - -Note that the output filename is currently hard-coded as `results-${System.currentTimeMillis()}.md` - -### Compare existing datasets - -To compare a pair of existing datasets you can use a comparison tool. -The example below is for TPC-H queries results generated by pure Spark and Comet - -```shell -$SPARK_HOME/bin/spark-submit \ - --master $SPARK_MASTER \ - --class org.apache.comet.fuzz.ComparisonTool - target/comet-fuzz-spark3.5_2.12-0.13.0-SNAPSHOT-jar-with-dependencies.jar \ - compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet -``` - -The tool takes a pair of existing folders of the same layout and compares subfolders treating them as parquet based datasets diff --git a/fuzz-testing/pom.xml b/fuzz-testing/pom.xml deleted file mode 100644 index f4f1d4ef2b..0000000000 --- a/fuzz-testing/pom.xml +++ /dev/null @@ -1,129 +0,0 @@ - - - - 4.0.0 - - - org.apache.datafusion - comet-parent-spark${spark.version.short}_${scala.binary.version} - 0.16.0-SNAPSHOT - ../pom.xml - - - comet-fuzz-spark${spark.version.short}_${scala.binary.version} - comet-fuzz - http://maven.apache.org - jar - - - - false - - - - - org.scala-lang - scala-library - ${scala.version} - provided - - - org.apache.spark - spark-sql_${scala.binary.version} - provided - - - org.scala-lang.modules - scala-collection-compat_${scala.binary.version} - - - - - org.apache.datafusion - comet-spark-spark${spark.version.short}_${scala.binary.version} - ${project.version} - - - org.scala-lang.modules - scala-collection-compat_${scala.binary.version} - - - - - org.rogach - scallop_${scala.binary.version} - - - - - src/main/scala - - - org.apache.maven.plugins - maven-compiler-plugin - ${maven-compiler-plugin.version} - - ${java.version} - ${java.version} - - - - net.alchim31.maven - scala-maven-plugin - ${scala.plugin.version} - - - - compile - testCompile - - - - - - maven-assembly-plugin - ${maven-assembly-plugin.version} - - - jar-with-dependencies - - - - - make-assembly - package - - single - - - - - - org.apache.maven.plugins - maven-install-plugin - - true - - - - - diff --git a/fuzz-testing/run.sh b/fuzz-testing/run.sh deleted file mode 100755 index 27391249c2..0000000000 --- a/fuzz-testing/run.sh +++ /dev/null @@ -1,87 +0,0 @@ -#!/bin/bash -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -# Usage: ./run.sh -# Builds necessary JARs, generates data and queries, and runs fuzz tests for Comet Spark. -# Environment variables: -# SPARK_HOME - path to Spark installation -# SPARK_MASTER - Spark master URL (default: local[*]) -# SCALA_MAJOR_VERSION - Scala major version to use (default: 2.12) -# SPARK_MAJOR_VERSION - Spark major version to use (default: 3.5) -# NUM_FILES - number of data files to generate (default: 2) -# NUM_ROWS - number of rows per file (default: 200) -# NUM_QUERIES - number of queries to generate (default: 500) - -set -eux - -DIR="$(cd "$(dirname "$0")" && pwd)" -PARENT_DIR="${DIR}/.." -MVN_CMD="${PARENT_DIR}/mvnw" -SPARK_MASTER="${SPARK_MASTER:-local[*]}" -SCALA_MAJOR_VERSION="${SCALA_MAJOR_VERSION:-2.12}" -SPARK_MAJOR_VERSION="${SPARK_MAJOR_VERSION:-3.5}" -PROFILES="-Pscala-${SCALA_MAJOR_VERSION},spark-${SPARK_MAJOR_VERSION}" -PROJECT_VERSION=$("${MVN_CMD}" -f "${DIR}/pom.xml" -q help:evaluate -Dexpression=project.version -DforceStdout) -COMET_SPARK_JAR="${PARENT_DIR}/spark/target/comet-spark${SPARK_MAJOR_VERSION}_${SCALA_MAJOR_VERSION}-${PROJECT_VERSION}.jar" -COMET_FUZZ_JAR="${DIR}/target/comet-fuzz-spark${SPARK_MAJOR_VERSION}_${SCALA_MAJOR_VERSION}-${PROJECT_VERSION}-jar-with-dependencies.jar" -NUM_FILES="${NUM_FILES:-2}" -NUM_ROWS="${NUM_ROWS:-200}" -NUM_QUERIES="${NUM_QUERIES:-500}" - -if [ ! -f "${COMET_SPARK_JAR}" ]; then - echo "Building Comet Spark jar..." - pushd "${PARENT_DIR}" - PROFILES="${PROFILES}" make - popd -else - echo "Building Fuzz testing jar..." - "${MVN_CMD}" -f "${DIR}/pom.xml" package -DskipTests "${PROFILES}" -fi - -echo "Generating data..." -"${SPARK_HOME}/bin/spark-submit" \ - --master "${SPARK_MASTER}" \ - --class org.apache.comet.fuzz.Main \ - "${COMET_FUZZ_JAR}" \ - data --num-files="${NUM_FILES}" --num-rows="${NUM_ROWS}" \ - --exclude-negative-zero \ - --generate-arrays --generate-structs --generate-maps - -echo "Generating queries..." -"${SPARK_HOME}/bin/spark-submit" \ - --master "${SPARK_MASTER}" \ - --class org.apache.comet.fuzz.Main \ - "${COMET_FUZZ_JAR}" \ - queries --num-files="${NUM_FILES}" --num-queries="${NUM_QUERIES}" - -echo "Running fuzz tests..." -"${SPARK_HOME}/bin/spark-submit" \ - --master "${SPARK_MASTER}" \ - --conf spark.memory.offHeap.enabled=true \ - --conf spark.memory.offHeap.size=16G \ - --conf spark.plugins=org.apache.spark.CometPlugin \ - --conf spark.comet.enabled=true \ - --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ - --conf spark.comet.exec.shuffle.enabled=true \ - --jars "${COMET_SPARK_JAR}" \ - --conf spark.driver.extraClassPath="${COMET_SPARK_JAR}" \ - --conf spark.executor.extraClassPath="${COMET_SPARK_JAR}" \ - --class org.apache.comet.fuzz.Main \ - "${COMET_FUZZ_JAR}" \ - run --num-files="${NUM_FILES}" --filename="queries.sql" diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala deleted file mode 100644 index 055ea6553f..0000000000 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.fuzz - -import java.io.File - -import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand} - -import org.apache.spark.sql.{functions, SparkSession} - -class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) { - object compareParquet extends Subcommand("compareParquet") { - val inputSparkFolder: ScallopOption[String] = - opt[String](required = true, descr = "Folder with Spark produced results in Parquet format") - val inputCometFolder: ScallopOption[String] = - opt[String](required = true, descr = "Folder with Comet produced results in Parquet format") - val tolerance: ScallopOption[Double] = - opt[Double](default = Some(0.000002), descr = "Tolerance for floating point comparisons") - } - addSubcommand(compareParquet) - verify() -} - -object ComparisonTool { - - lazy val spark: SparkSession = SparkSession - .builder() - .getOrCreate() - - def main(args: Array[String]): Unit = { - val conf = new ComparisonToolConf(args.toIndexedSeq) - conf.subcommand match { - case Some(conf.compareParquet) => - compareParquetFolders( - spark, - conf.compareParquet.inputSparkFolder(), - conf.compareParquet.inputCometFolder(), - conf.compareParquet.tolerance()) - - case _ => - // scalastyle:off println - println("Invalid subcommand") - // scalastyle:on println - sys.exit(-1) - } - } - - private def compareParquetFolders( - spark: SparkSession, - sparkFolderPath: String, - cometFolderPath: String, - tolerance: Double): Unit = { - - val output = QueryRunner.createOutputMdFile() - - try { - val sparkFolder = new File(sparkFolderPath) - val cometFolder = new File(cometFolderPath) - - if (!sparkFolder.exists() || !sparkFolder.isDirectory) { - throw new IllegalArgumentException( - s"Spark folder does not exist or is not a directory: $sparkFolderPath") - } - - if (!cometFolder.exists() || !cometFolder.isDirectory) { - throw new IllegalArgumentException( - s"Comet folder does not exist or is not a directory: $cometFolderPath") - } - - // Get all subdirectories from the Spark folder - val sparkSubfolders = sparkFolder - .listFiles() - .filter(_.isDirectory) - .map(_.getName) - .sorted - - output.write("# Comparing Parquet Folders\n\n") - output.write(s"Spark folder: $sparkFolderPath\n") - output.write(s"Comet folder: $cometFolderPath\n") - output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n") - - // Compare each subfolder - sparkSubfolders.foreach { subfolderName => - val sparkSubfolderPath = new File(sparkFolder, subfolderName) - val cometSubfolderPath = new File(cometFolder, subfolderName) - - if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) { - output.write(s"## Subfolder: $subfolderName\n") - output.write( - s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n") - } else { - output.write(s"## Comparing subfolder: $subfolderName\n\n") - - try { - // Read Spark parquet files - spark.conf.set("spark.comet.enabled", "false") - val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath) - val sparkRows = sparkDf.orderBy(sparkDf.columns.map(functions.col): _*).collect() - - // Read Comet parquet files - val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath) - val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect() - - // Compare the results - if (QueryComparison.assertSameRows(sparkRows, cometRows, output, tolerance)) { - output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n") - } else { - // Output schema if dataframes are not equal - QueryComparison.showSchema( - output, - sparkDf.schema.treeString, - cometDf.schema.treeString) - } - } catch { - case e: Exception => - output.write( - s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n") - val sw = new java.io.StringWriter() - val p = new java.io.PrintWriter(sw) - e.printStackTrace(p) - p.close() - output.write(s"```\n${sw.toString}\n```\n\n") - } - } - - output.flush() - } - - output.write("\n# Comparison Complete\n") - output.write(s"Compared ${sparkSubfolders.length} subfolders\n") - - } finally { - output.close() - } - } -} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala deleted file mode 100644 index ced2ad28c4..0000000000 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.fuzz - -import scala.util.Random - -import org.rogach.scallop.{ScallopConf, Subcommand} -import org.rogach.scallop.ScallopOption - -import org.apache.spark.sql.SparkSession - -import org.apache.comet.testing.{DataGenOptions, ParquetGenerator, SchemaGenOptions} - -class Conf(arguments: Seq[String]) extends ScallopConf(arguments) { - object generateData extends Subcommand("data") { - val numFiles: ScallopOption[Int] = - opt[Int](required = true, descr = "Number of files to generate") - val numRows: ScallopOption[Int] = opt[Int](required = true, descr = "Number of rows per file") - val randomSeed: ScallopOption[Long] = - opt[Long](required = false, descr = "Random seed to use") - val generateArrays: ScallopOption[Boolean] = - opt[Boolean](required = false, descr = "Whether to generate arrays") - val generateStructs: ScallopOption[Boolean] = - opt[Boolean](required = false, descr = "Whether to generate structs") - val generateMaps: ScallopOption[Boolean] = - opt[Boolean](required = false, descr = "Whether to generate maps") - val excludeNegativeZero: ScallopOption[Boolean] = - opt[Boolean](required = false, descr = "Whether to exclude negative zero") - } - addSubcommand(generateData) - object generateQueries extends Subcommand("queries") { - val numFiles: ScallopOption[Int] = - opt[Int](required = true, descr = "Number of input files to use") - val numQueries: ScallopOption[Int] = - opt[Int](required = true, descr = "Number of queries to generate") - val randomSeed: ScallopOption[Long] = - opt[Long](required = false, descr = "Random seed to use") - } - addSubcommand(generateQueries) - object runQueries extends Subcommand("run") { - val filename: ScallopOption[String] = - opt[String](required = true, descr = "File to write queries to") - val numFiles: ScallopOption[Int] = - opt[Int](required = true, descr = "Number of input files to use") - val showFailedSparkQueries: ScallopOption[Boolean] = - opt[Boolean](required = false, descr = "Whether to show failed Spark queries") - } - addSubcommand(runQueries) - verify() -} - -object Main { - - lazy val spark: SparkSession = SparkSession - .builder() - .getOrCreate() - - def main(args: Array[String]): Unit = { - val conf = new Conf(args.toIndexedSeq) - conf.subcommand match { - case Some(conf.generateData) => - val r = conf.generateData.randomSeed.toOption match { - case Some(seed) => new Random(seed) - case None => new Random() - } - for (i <- 0 until conf.generateData.numFiles()) { - ParquetGenerator.makeParquetFile( - r, - spark, - s"test$i.parquet", - numRows = conf.generateData.numRows(), - SchemaGenOptions( - generateArray = conf.generateData.generateArrays(), - generateStruct = conf.generateData.generateStructs(), - generateMap = conf.generateData.generateMaps(), - // create two columns of each primitive type so that they can be used in binary - // expressions such as `a + b` and `a < b` - primitiveTypes = SchemaGenOptions.defaultPrimitiveTypes ++ - SchemaGenOptions.defaultPrimitiveTypes), - DataGenOptions( - allowNull = true, - generateNegativeZero = !conf.generateData.excludeNegativeZero())) - } - case Some(conf.generateQueries) => - val r = conf.generateQueries.randomSeed.toOption match { - case Some(seed) => new Random(seed) - case None => new Random() - } - QueryGen.generateRandomQueries( - r, - spark, - numFiles = conf.generateQueries.numFiles(), - conf.generateQueries.numQueries()) - case Some(conf.runQueries) => - QueryRunner.runQueries( - spark, - conf.runQueries.numFiles(), - conf.runQueries.filename(), - conf.runQueries.showFailedSparkQueries()) - case _ => - // scalastyle:off println - println("Invalid subcommand") - // scalastyle:on println - sys.exit(-1) - } - } -} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala deleted file mode 100644 index d390e86aee..0000000000 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Meta.scala +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.fuzz - -import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.types.DataTypes - -sealed trait SparkType -case class SparkTypeOneOf(dataTypes: Seq[SparkType]) extends SparkType -case object SparkBooleanType extends SparkType -case object SparkBinaryType extends SparkType -case object SparkStringType extends SparkType -case object SparkIntegralType extends SparkType -case object SparkByteType extends SparkType -case object SparkShortType extends SparkType -case object SparkIntType extends SparkType -case object SparkLongType extends SparkType -case object SparkFloatType extends SparkType -case object SparkDoubleType extends SparkType -case class SparkDecimalType(p: Int, s: Int) extends SparkType -case object SparkNumericType extends SparkType -case object SparkDateType extends SparkType -case object SparkTimestampType extends SparkType -case object SparkDateOrTimestampType extends SparkType -case class SparkArrayType(elementType: SparkType) extends SparkType -case class SparkMapType(keyType: SparkType, valueType: SparkType) extends SparkType -case class SparkStructType(fields: Seq[SparkType]) extends SparkType -case object SparkAnyType extends SparkType - -case class FunctionSignature(inputTypes: Seq[SparkType], varArgs: Boolean = false) - -case class Function(name: String, signatures: Seq[FunctionSignature]) - -object Meta { - - val primitiveSparkTypes: Seq[SparkType] = Seq( - SparkBooleanType, - SparkBinaryType, - SparkStringType, - SparkByteType, - SparkShortType, - SparkIntType, - SparkLongType, - SparkFloatType, - SparkDoubleType, - SparkDateType, - SparkTimestampType) - - val dataTypes: Seq[(DataType, Double)] = Seq( - (DataTypes.BooleanType, 0.1), - (DataTypes.ByteType, 0.2), - (DataTypes.ShortType, 0.2), - (DataTypes.IntegerType, 0.2), - (DataTypes.LongType, 0.2), - (DataTypes.FloatType, 0.2), - (DataTypes.DoubleType, 0.2), - (DataTypes.createDecimalType(10, 2), 0.2), - (DataTypes.DateType, 0.2), - (DataTypes.TimestampType, 0.2), - (DataTypes.TimestampNTZType, 0.2), - (DataTypes.StringType, 0.2), - (DataTypes.BinaryType, 0.1)) - - private def createFunctionWithInputTypes( - name: String, - inputs: Seq[SparkType], - varArgs: Boolean = false): Function = { - Function(name, Seq(FunctionSignature(inputs, varArgs))) - createFunctions(name, Seq(FunctionSignature(inputs, varArgs))) - } - - private def createFunctions(name: String, signatures: Seq[FunctionSignature]): Function = { - signatures.foreach { s => - assert( - !s.varArgs || s.inputTypes.length == 1, - s"Variadic function $s must have exactly one input type") - } - Function(name, signatures) - } - - private def createUnaryStringFunction(name: String): Function = { - createFunctionWithInputTypes(name, Seq(SparkStringType)) - } - - private def createUnaryNumericFunction(name: String): Function = { - createFunctionWithInputTypes(name, Seq(SparkNumericType)) - } - - // Math expressions (corresponds to mathExpressions in QueryPlanSerde) - val mathScalarFunc: Seq[Function] = Seq( - createUnaryNumericFunction("abs"), - createUnaryNumericFunction("acos"), - createUnaryNumericFunction("asin"), - createUnaryNumericFunction("atan"), - createFunctionWithInputTypes("atan2", Seq(SparkNumericType, SparkNumericType)), - createUnaryNumericFunction("cos"), - createUnaryNumericFunction("cosh"), - createUnaryNumericFunction("exp"), - createUnaryNumericFunction("expm1"), - createFunctionWithInputTypes("log", Seq(SparkNumericType, SparkNumericType)), - createUnaryNumericFunction("log10"), - createUnaryNumericFunction("log2"), - createFunctionWithInputTypes("pow", Seq(SparkNumericType, SparkNumericType)), - createFunctionWithInputTypes("remainder", Seq(SparkNumericType, SparkNumericType)), - createFunctions( - "round", - Seq( - FunctionSignature(Seq(SparkNumericType)), - FunctionSignature(Seq(SparkNumericType, SparkIntType)))), - createUnaryNumericFunction("signum"), - createUnaryNumericFunction("sin"), - createUnaryNumericFunction("sinh"), - createUnaryNumericFunction("sqrt"), - createUnaryNumericFunction("tan"), - createUnaryNumericFunction("tanh"), - createUnaryNumericFunction("cot"), - createUnaryNumericFunction("ceil"), - createUnaryNumericFunction("floor"), - createFunctionWithInputTypes("unary_minus", Seq(SparkNumericType))) - - // Hash expressions (corresponds to hashExpressions in QueryPlanSerde) - val hashScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes("md5", Seq(SparkAnyType)), - createFunctionWithInputTypes("murmur3_hash", Seq(SparkAnyType)), // TODO variadic - createFunctionWithInputTypes("sha2", Seq(SparkAnyType, SparkIntType))) - - // String expressions (corresponds to stringExpressions in QueryPlanSerde) - val stringScalarFunc: Seq[Function] = Seq( - createUnaryStringFunction("ascii"), - createUnaryStringFunction("bit_length"), - createUnaryStringFunction("chr"), - createFunctionWithInputTypes( - "concat", - Seq( - SparkTypeOneOf( - Seq( - SparkStringType, - SparkBinaryType, - SparkArrayType(SparkStringType), - SparkArrayType(SparkNumericType), - SparkArrayType(SparkBinaryType)))), - varArgs = true), - createFunctionWithInputTypes("concat_ws", Seq(SparkStringType, SparkStringType)), - createFunctionWithInputTypes("contains", Seq(SparkStringType, SparkStringType)), - createFunctionWithInputTypes("ends_with", Seq(SparkStringType, SparkStringType)), - createFunctionWithInputTypes( - "hex", - Seq(SparkTypeOneOf(Seq(SparkStringType, SparkBinaryType, SparkIntType, SparkLongType)))), - createUnaryStringFunction("init_cap"), - createFunctionWithInputTypes("instr", Seq(SparkStringType, SparkStringType)), - createFunctionWithInputTypes( - "length", - Seq(SparkTypeOneOf(Seq(SparkStringType, SparkBinaryType)))), - createFunctionWithInputTypes("like", Seq(SparkStringType, SparkStringType)), - createUnaryStringFunction("lower"), - createFunctions( - "lpad", - Seq( - FunctionSignature(Seq(SparkStringType, SparkIntegralType)), - FunctionSignature(Seq(SparkStringType, SparkIntegralType, SparkStringType)))), - createUnaryStringFunction("ltrim"), - createUnaryStringFunction("octet_length"), - createFunctions( - "regexp_replace", - Seq( - FunctionSignature(Seq(SparkStringType, SparkStringType, SparkStringType)), - FunctionSignature(Seq(SparkStringType, SparkStringType, SparkStringType, SparkIntType)))), - createFunctionWithInputTypes("repeat", Seq(SparkStringType, SparkIntType)), - createFunctions( - "replace", - Seq( - FunctionSignature(Seq(SparkStringType, SparkStringType)), - FunctionSignature(Seq(SparkStringType, SparkStringType, SparkStringType)))), - createFunctions( - "reverse", - Seq( - FunctionSignature(Seq(SparkStringType)), - FunctionSignature(Seq(SparkArrayType(SparkAnyType))))), - createFunctionWithInputTypes("rlike", Seq(SparkStringType, SparkStringType)), - createFunctions( - "rpad", - Seq( - FunctionSignature(Seq(SparkStringType, SparkIntegralType)), - FunctionSignature(Seq(SparkStringType, SparkIntegralType, SparkStringType)))), - createUnaryStringFunction("rtrim"), - createFunctions( - "split", - Seq( - FunctionSignature(Seq(SparkStringType, SparkStringType)), - FunctionSignature(Seq(SparkStringType, SparkStringType, SparkIntType)))), - createFunctionWithInputTypes("starts_with", Seq(SparkStringType, SparkStringType)), - createFunctionWithInputTypes("string_space", Seq(SparkIntType)), - createFunctionWithInputTypes("substring", Seq(SparkStringType, SparkIntType, SparkIntType)), - createFunctionWithInputTypes("translate", Seq(SparkStringType, SparkStringType)), - createUnaryStringFunction("trim"), - createUnaryStringFunction("btrim"), - createUnaryStringFunction("unhex"), - createUnaryStringFunction("upper"), - createFunctionWithInputTypes("xxhash64", Seq(SparkAnyType)), // TODO variadic - createFunctionWithInputTypes("sha1", Seq(SparkAnyType))) - - // Conditional expressions (corresponds to conditionalExpressions in QueryPlanSerde) - val conditionalScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes("if", Seq(SparkBooleanType, SparkAnyType, SparkAnyType))) - - // Map expressions (corresponds to mapExpressions in QueryPlanSerde) - val mapScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes( - "map_extract", - Seq(SparkMapType(SparkAnyType, SparkAnyType), SparkAnyType)), - createFunctionWithInputTypes("map_keys", Seq(SparkMapType(SparkAnyType, SparkAnyType))), - createFunctionWithInputTypes("map_entries", Seq(SparkMapType(SparkAnyType, SparkAnyType))), - createFunctionWithInputTypes("map_values", Seq(SparkMapType(SparkAnyType, SparkAnyType))), - createFunctionWithInputTypes( - "map_from_arrays", - Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes( - "map_from_entries", - Seq( - SparkArrayType( - SparkStructType(Seq( - SparkTypeOneOf(primitiveSparkTypes.filterNot(_ == SparkBinaryType)), - SparkTypeOneOf(primitiveSparkTypes.filterNot(_ == SparkBinaryType)))))))) - - // Predicate expressions (corresponds to predicateExpressions in QueryPlanSerde) - val predicateScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes("and", Seq(SparkBooleanType, SparkBooleanType)), - createFunctionWithInputTypes("or", Seq(SparkBooleanType, SparkBooleanType)), - createFunctionWithInputTypes("not", Seq(SparkBooleanType)), - createFunctionWithInputTypes("in", Seq(SparkAnyType, SparkAnyType)) - ) // TODO: variadic - - // Struct expressions (corresponds to structExpressions in QueryPlanSerde) - val structScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes( - "create_named_struct", - Seq(SparkStringType, SparkAnyType) - ), // TODO: variadic name/value pairs - createFunctionWithInputTypes( - "get_struct_field", - Seq(SparkStructType(Seq(SparkAnyType)), SparkStringType))) - - // Bitwise expressions (corresponds to bitwiseExpressions in QueryPlanSerde) - val bitwiseScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes("bitwise_and", Seq(SparkIntegralType, SparkIntegralType)), - createFunctionWithInputTypes("bitwise_count", Seq(SparkIntegralType)), - createFunctionWithInputTypes("bitwise_get", Seq(SparkIntegralType, SparkIntType)), - createFunctionWithInputTypes("bitwise_or", Seq(SparkIntegralType, SparkIntegralType)), - createFunctionWithInputTypes("bitwise_not", Seq(SparkIntegralType)), - createFunctionWithInputTypes("bitwise_xor", Seq(SparkIntegralType, SparkIntegralType)), - createFunctionWithInputTypes("shift_left", Seq(SparkIntegralType, SparkIntType)), - createFunctionWithInputTypes("shift_right", Seq(SparkIntegralType, SparkIntType))) - - // Misc expressions (corresponds to miscExpressions in QueryPlanSerde) - val miscScalarFunc: Seq[Function] = - Seq( - createFunctionWithInputTypes("isnan", Seq(SparkNumericType)), - createFunctionWithInputTypes("isnull", Seq(SparkAnyType)), - createFunctionWithInputTypes("isnotnull", Seq(SparkAnyType)), - createFunctionWithInputTypes("coalesce", Seq(SparkAnyType, SparkAnyType)) - ) // TODO: variadic - - // Array expressions (corresponds to arrayExpressions in QueryPlanSerde) - val arrayScalarFunc: Seq[Function] = Seq( - createFunctionWithInputTypes("array_append", Seq(SparkArrayType(SparkAnyType), SparkAnyType)), - createFunctionWithInputTypes("array_compact", Seq(SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes( - "array_contains", - Seq(SparkArrayType(SparkAnyType), SparkAnyType)), - createFunctionWithInputTypes("array_distinct", Seq(SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes( - "array_except", - Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes( - "array_insert", - Seq(SparkArrayType(SparkAnyType), SparkIntType, SparkAnyType)), - createFunctionWithInputTypes( - "array_intersect", - Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))), - createFunctions( - "array_join", - Seq( - FunctionSignature(Seq(SparkArrayType(SparkAnyType), SparkStringType)), - FunctionSignature(Seq(SparkArrayType(SparkAnyType), SparkStringType, SparkStringType)))), - createFunctionWithInputTypes("array_max", Seq(SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes("array_min", Seq(SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes("array_remove", Seq(SparkArrayType(SparkAnyType), SparkAnyType)), - createFunctionWithInputTypes("array_repeat", Seq(SparkAnyType, SparkIntType)), - createFunctionWithInputTypes( - "arrays_overlap", - Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes( - "array_union", - Seq(SparkArrayType(SparkAnyType), SparkArrayType(SparkAnyType))), - createFunctionWithInputTypes("array", Seq(SparkAnyType, SparkAnyType)), // TODO: variadic - createFunctionWithInputTypes( - "element_at", - Seq( - SparkTypeOneOf( - Seq(SparkArrayType(SparkAnyType), SparkMapType(SparkAnyType, SparkAnyType))), - SparkAnyType)), - createFunctionWithInputTypes("flatten", Seq(SparkArrayType(SparkArrayType(SparkAnyType)))), - createFunctionWithInputTypes( - "get_array_item", - Seq(SparkArrayType(SparkAnyType), SparkIntType))) - - // Temporal expressions (corresponds to temporalExpressions in QueryPlanSerde) - val temporalScalarFunc: Seq[Function] = - Seq( - createFunctionWithInputTypes("date_add", Seq(SparkDateType, SparkIntType)), - createFunctionWithInputTypes("date_sub", Seq(SparkDateType, SparkIntType)), - createFunctions( - "from_unixtime", - Seq( - FunctionSignature(Seq(SparkLongType)), - FunctionSignature(Seq(SparkLongType, SparkStringType)))), - createFunctionWithInputTypes("hour", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("minute", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("second", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("trunc", Seq(SparkDateOrTimestampType, SparkStringType)), - createFunctionWithInputTypes("year", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("month", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("day", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("dayofmonth", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("dayofweek", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("weekday", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("dayofyear", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("weekofyear", Seq(SparkDateOrTimestampType)), - createFunctionWithInputTypes("quarter", Seq(SparkDateOrTimestampType))) - - // Combined in same order as exprSerdeMap in QueryPlanSerde - val scalarFunc: Seq[Function] = mathScalarFunc ++ hashScalarFunc ++ stringScalarFunc ++ - conditionalScalarFunc ++ mapScalarFunc ++ predicateScalarFunc ++ - structScalarFunc ++ bitwiseScalarFunc ++ miscScalarFunc ++ arrayScalarFunc ++ - temporalScalarFunc - - val aggFunc: Seq[Function] = Seq( - createFunctionWithInputTypes("min", Seq(SparkAnyType)), - createFunctionWithInputTypes("max", Seq(SparkAnyType)), - createFunctionWithInputTypes("count", Seq(SparkAnyType)), - createUnaryNumericFunction("avg"), - createUnaryNumericFunction("sum"), - // first/last are non-deterministic and known to be incompatible with Spark -// createFunctionWithInputTypes("first", Seq(SparkAnyType)), -// createFunctionWithInputTypes("last", Seq(SparkAnyType)), - createUnaryNumericFunction("var_pop"), - createUnaryNumericFunction("var_samp"), - createFunctionWithInputTypes("covar_pop", Seq(SparkNumericType, SparkNumericType)), - createFunctionWithInputTypes("covar_samp", Seq(SparkNumericType, SparkNumericType)), - createUnaryNumericFunction("stddev_pop"), - createUnaryNumericFunction("stddev_samp"), - createFunctionWithInputTypes("corr", Seq(SparkNumericType, SparkNumericType)), - createFunctionWithInputTypes("bit_and", Seq(SparkIntegralType)), - createFunctionWithInputTypes("bit_or", Seq(SparkIntegralType)), - createFunctionWithInputTypes("bit_xor", Seq(SparkIntegralType))) - - val unaryArithmeticOps: Seq[String] = Seq("+", "-") - - val binaryArithmeticOps: Seq[String] = - Seq("+", "-", "*", "/", "%", "&", "|", "^", "<<", ">>", "div") - - val comparisonOps: Seq[String] = Seq("=", "<=>", ">", ">=", "<", "<=") - - // TODO make this more comprehensive - val comparisonTypes: Seq[SparkType] = Seq( - SparkStringType, - SparkBinaryType, - SparkNumericType, - SparkDateType, - SparkTimestampType, - SparkArrayType(SparkTypeOneOf(Seq(SparkStringType, SparkNumericType, SparkDateType)))) - -} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala deleted file mode 100644 index 6d404e6548..0000000000 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryGen.scala +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.fuzz - -import java.io.{BufferedWriter, FileWriter} - -import scala.annotation.tailrec -import scala.collection.mutable -import scala.util.Random - -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.apache.spark.sql.types._ - -object QueryGen { - - def generateRandomQueries( - r: Random, - spark: SparkSession, - numFiles: Int, - numQueries: Int): Unit = { - for (i <- 0 until numFiles) { - spark.read.parquet(s"test$i.parquet").createTempView(s"test$i") - } - - val w = new BufferedWriter(new FileWriter("queries.sql")) - - val uniqueQueries = mutable.HashSet[String]() - - for (_ <- 0 until numQueries) { - try { - val sql = r.nextInt().abs % 8 match { - case 0 => generateJoin(r, spark, numFiles) - case 1 => generateAggregate(r, spark, numFiles) - case 2 => generateScalar(r, spark, numFiles) - case 3 => generateCast(r, spark, numFiles) - case 4 => generateUnaryArithmetic(r, spark, numFiles) - case 5 => generateBinaryArithmetic(r, spark, numFiles) - case 6 => generateBinaryComparison(r, spark, numFiles) - case _ => generateConditional(r, spark, numFiles) - } - if (!uniqueQueries.contains(sql)) { - uniqueQueries += sql - w.write(sql + "\n") - } - } catch { - case e: Exception => - // scalastyle:off - println(s"Failed to generate query: ${e.getMessage}") - } - } - w.close() - } - - private def generateAggregate(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val func = Utils.randomChoice(Meta.aggFunc, r) - try { - val signature = Utils.randomChoice(func.signatures, r) - val args = signature.inputTypes.map(x => pickRandomColumn(r, table, x)) - - val groupingCols = Range(0, 2).map(_ => Utils.randomChoice(table.columns, r)) - - if (groupingCols.isEmpty) { - s"SELECT ${args.mkString(", ")}, ${func.name}(${args.mkString(", ")}) AS x " + - s"FROM $tableName " + - s"ORDER BY ${args.mkString(", ")};" - } else { - s"SELECT ${groupingCols.mkString(", ")}, ${func.name}(${args.mkString(", ")}) " + - s"FROM $tableName " + - s"GROUP BY ${groupingCols.mkString(",")} " + - s"ORDER BY ${groupingCols.mkString(", ")};" - } - } catch { - case e: Exception => - throw new IllegalStateException( - s"Failed to generate SQL for aggregate function ${func.name}", - e) - } - } - - private def generateScalar(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val func = Utils.randomChoice(Meta.scalarFunc, r) - try { - val signature = Utils.randomChoice(func.signatures, r) - val args = - if (signature.varArgs) { - pickRandomColumns(r, table, signature.inputTypes.head) - } else { - signature.inputTypes.map(x => pickRandomColumn(r, table, x)) - } - - // Example SELECT c0, log(c0) as x FROM test0 - s"SELECT ${args.mkString(", ")}, ${func.name}(${args.mkString(", ")}) AS x " + - s"FROM $tableName " + - s"ORDER BY ${args.mkString(", ")};" - } catch { - case e: Exception => - throw new IllegalStateException( - s"Failed to generate SQL for scalar function ${func.name}", - e) - } - } - - @tailrec - private def pickRandomColumns(r: Random, df: DataFrame, targetType: SparkType): Seq[String] = { - targetType match { - case SparkTypeOneOf(choices) => - val chosenType = Utils.randomChoice(choices, r) - pickRandomColumns(r, df, chosenType) - case _ => - var columns = Set.empty[String] - for (_ <- 0 to r.nextInt(df.columns.length)) { - columns += pickRandomColumn(r, df, targetType) - } - columns.toSeq - } - } - - private def pickRandomColumn(r: Random, df: DataFrame, targetType: SparkType): String = { - targetType match { - case SparkAnyType => - Utils.randomChoice(df.schema.fields, r).name - case SparkBooleanType => - select(r, df, _.dataType == BooleanType) - case SparkByteType => - select(r, df, _.dataType == ByteType) - case SparkShortType => - select(r, df, _.dataType == ShortType) - case SparkIntType => - select(r, df, _.dataType == IntegerType) - case SparkLongType => - select(r, df, _.dataType == LongType) - case SparkFloatType => - select(r, df, _.dataType == FloatType) - case SparkDoubleType => - select(r, df, _.dataType == DoubleType) - case SparkDecimalType(_, _) => - select(r, df, _.dataType.isInstanceOf[DecimalType]) - case SparkIntegralType => - select( - r, - df, - f => - f.dataType == ByteType || f.dataType == ShortType || - f.dataType == IntegerType || f.dataType == LongType) - case SparkNumericType => - select(r, df, f => isNumeric(f.dataType)) - case SparkStringType => - select(r, df, _.dataType == StringType) - case SparkBinaryType => - select(r, df, _.dataType == BinaryType) - case SparkDateType => - select(r, df, _.dataType == DateType) - case SparkTimestampType => - select(r, df, _.dataType == TimestampType) - case SparkDateOrTimestampType => - select(r, df, f => f.dataType == DateType || f.dataType == TimestampType) - case SparkTypeOneOf(choices) => - pickRandomColumn(r, df, Utils.randomChoice(choices, r)) - case SparkArrayType(elementType) => - select( - r, - df, - _.dataType match { - case ArrayType(x, _) if typeMatch(elementType, x) => true - case _ => false - }) - case SparkMapType(keyType, valueType) => - select( - r, - df, - _.dataType match { - case MapType(k, v, _) if typeMatch(keyType, k) && typeMatch(valueType, v) => true - case _ => false - }) - case SparkStructType(fields) => - select( - r, - df, - _.dataType match { - case StructType(structFields) if structFields.length == fields.length => true - case _ => false - }) - case _ => - throw new IllegalStateException(targetType.toString) - } - } - - def pickTwoRandomColumns(r: Random, df: DataFrame, targetType: SparkType): (String, String) = { - val a = pickRandomColumn(r, df, targetType) - val df2 = df.drop(a) - val b = pickRandomColumn(r, df2, targetType) - (a, b) - } - - /** Select a random field that matches a predicate */ - private def select(r: Random, df: DataFrame, predicate: StructField => Boolean): String = { - val candidates = df.schema.fields.filter(predicate) - if (candidates.isEmpty) { - throw new IllegalStateException("Failed to find suitable column") - } - Utils.randomChoice(candidates, r).name - } - - private def isNumeric(d: DataType): Boolean = { - d match { - case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType | - _: DoubleType | _: DecimalType => - true - case _ => false - } - } - - private def typeMatch(s: SparkType, d: DataType): Boolean = { - (s, d) match { - case (SparkAnyType, _) => true - case (SparkBooleanType, BooleanType) => true - case (SparkByteType, ByteType) => true - case (SparkShortType, ShortType) => true - case (SparkIntType, IntegerType) => true - case (SparkLongType, LongType) => true - case (SparkFloatType, FloatType) => true - case (SparkDoubleType, DoubleType) => true - case (SparkDecimalType(_, _), _: DecimalType) => true - case (SparkIntegralType, ByteType | ShortType | IntegerType | LongType) => true - case (SparkNumericType, _) if isNumeric(d) => true - case (SparkStringType, StringType) => true - case (SparkBinaryType, BinaryType) => true - case (SparkDateType, DateType) => true - case (SparkTimestampType, TimestampType | TimestampNTZType) => true - case (SparkDateOrTimestampType, DateType | TimestampType | TimestampNTZType) => true - case (SparkArrayType(elementType), ArrayType(elementDataType, _)) => - typeMatch(elementType, elementDataType) - case (SparkMapType(keyType, valueType), MapType(keyDataType, valueDataType, _)) => - typeMatch(keyType, keyDataType) && typeMatch(valueType, valueDataType) - case (SparkStructType(fields), StructType(structFields)) => - fields.length == structFields.length && - fields.zip(structFields.map(_.dataType)).forall { case (sparkType, dataType) => - typeMatch(sparkType, dataType) - } - case (SparkTypeOneOf(choices), _) => - choices.exists(choice => typeMatch(choice, d)) - case _ => false - } - } - - private def generateUnaryArithmetic(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val op = Utils.randomChoice(Meta.unaryArithmeticOps, r) - val a = pickRandomColumn(r, table, SparkNumericType) - - // Example SELECT a, -a FROM test0 - s"SELECT $a, $op$a " + - s"FROM $tableName " + - s"ORDER BY $a;" - } - - private def generateBinaryArithmetic(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val op = Utils.randomChoice(Meta.binaryArithmeticOps, r) - val (a, b) = pickTwoRandomColumns(r, table, SparkNumericType) - - // Example SELECT a, b, a+b FROM test0 - s"SELECT $a, $b, $a $op $b " + - s"FROM $tableName " + - s"ORDER BY $a, $b;" - } - - private def generateBinaryComparison(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val op = Utils.randomChoice(Meta.comparisonOps, r) - - // pick two columns with the same type - val opType = Utils.randomChoice(Meta.comparisonTypes, r) - val (a, b) = pickTwoRandomColumns(r, table, opType) - - // Example SELECT a, b, a <=> b FROM test0 - s"SELECT $a, $b, $a $op $b " + - s"FROM $tableName " + - s"ORDER BY $a, $b;" - } - - private def generateConditional(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val op = Utils.randomChoice(Meta.comparisonOps, r) - - // pick two columns with the same type - val opType = Utils.randomChoice(Meta.comparisonTypes, r) - val (a, b) = pickTwoRandomColumns(r, table, opType) - - // Example SELECT a, b, IF(a <=> b, 1, 2), CASE WHEN a <=> b THEN 1 ELSE 2 END FROM test0 - s"SELECT $a, $b, $a $op $b, IF($a $op $b, 1, 2), CASE WHEN $a $op $b THEN 1 ELSE 2 END " + - s"FROM $tableName " + - s"ORDER BY $a, $b;" - } - - private def generateCast(r: Random, spark: SparkSession, numFiles: Int): String = { - val tableName = s"test${r.nextInt(numFiles)}" - val table = spark.table(tableName) - - val toType = Utils.randomWeightedChoice(Meta.dataTypes, r).sql - val arg = Utils.randomChoice(table.columns, r) - - // We test both `cast` and `try_cast` to cover LEGACY and TRY eval modes. It is not - // recommended to run Comet Fuzz with ANSI enabled currently. - // Example SELECT c0, cast(c0 as float), try_cast(c0 as float) FROM test0 - s"SELECT $arg, cast($arg as $toType), try_cast($arg as $toType) " + - s"FROM $tableName " + - s"ORDER BY $arg;" - } - - private def generateJoin(r: Random, spark: SparkSession, numFiles: Int): String = { - val leftTableName = s"test${r.nextInt(numFiles)}" - val rightTableName = s"test${r.nextInt(numFiles)}" - val leftTable = spark.table(leftTableName) - val rightTable = spark.table(rightTableName) - - val leftCol = Utils.randomChoice(leftTable.columns, r) - val rightCol = Utils.randomChoice(rightTable.columns, r) - - val joinTypes = Seq(("INNER", 0.4), ("LEFT", 0.3), ("RIGHT", 0.3)) - val joinType = Utils.randomWeightedChoice(joinTypes, r) - - val leftColProjection = leftTable.columns.map(c => s"l.$c").mkString(", ") - val rightColProjection = rightTable.columns.map(c => s"r.$c").mkString(", ") - "SELECT " + - s"$leftColProjection, " + - s"$rightColProjection " + - s"FROM $leftTableName l " + - s"$joinType JOIN $rightTableName r " + - s"ON l.$leftCol = r.$rightCol " + - "ORDER BY " + - s"$leftColProjection, " + - s"$rightColProjection;" - } - -} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala deleted file mode 100644 index dc7189c533..0000000000 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.fuzz - -import java.io.{BufferedWriter, FileWriter, PrintWriter, StringWriter} - -import scala.collection.mutable -import scala.io.Source - -import org.apache.spark.sql.{Row, SparkSession} - -import org.apache.comet.fuzz.QueryComparison.showPlans - -object QueryRunner { - - def createOutputMdFile(): BufferedWriter = { - val outputFilename = s"results-${System.currentTimeMillis()}.md" - // scalastyle:off println - println(s"Writing results to $outputFilename") - // scalastyle:on println - - new BufferedWriter(new FileWriter(outputFilename)) - } - - def runQueries( - spark: SparkSession, - numFiles: Int, - filename: String, - showFailedSparkQueries: Boolean = false): Unit = { - - var queryCount = 0 - var invalidQueryCount = 0 - var cometFailureCount = 0 - var cometSuccessCount = 0 - - val w = createOutputMdFile() - - // register input files - for (i <- 0 until numFiles) { - val table = spark.read.parquet(s"test$i.parquet") - val tableName = s"test$i" - table.createTempView(tableName) - w.write( - s"Created table $tableName with schema:\n\t" + - s"${table.schema.fields.map(f => s"${f.name}: ${f.dataType}").mkString("\n\t")}\n\n") - } - - val querySource = Source.fromFile(filename) - try { - querySource - .getLines() - .foreach(sql => { - queryCount += 1 - try { - // execute with Spark - spark.conf.set("spark.comet.enabled", "false") - val df = spark.sql(sql) - val sparkRows = df.collect() - val sparkPlan = df.queryExecution.executedPlan.toString - - // execute with Comet - try { - spark.conf.set("spark.comet.enabled", "true") - val df = spark.sql(sql) - val cometRows = df.collect() - val cometPlan = df.queryExecution.executedPlan.toString - - var success = QueryComparison.assertSameRows(sparkRows, cometRows, output = w) - - // check that the plan contains Comet operators - if (!cometPlan.contains("Comet")) { - success = false - w.write("[ERROR] Comet did not accelerate any part of the plan\n") - } - - QueryComparison.showSQL(w, sql) - - if (success) { - cometSuccessCount += 1 - } else { - // show plans for failed queries - showPlans(w, sparkPlan, cometPlan) - cometFailureCount += 1 - } - - } catch { - case e: Exception => - // the query worked in Spark but failed in Comet, so this is likely a bug in Comet - cometFailureCount += 1 - QueryComparison.showSQL(w, sql) - w.write("### Spark Plan\n") - w.write(s"```\n$sparkPlan\n```\n") - - w.write(s"[ERROR] Query failed in Comet: ${e.getMessage}:\n") - w.write("```\n") - val sw = new StringWriter() - val p = new PrintWriter(sw) - e.printStackTrace(p) - p.close() - w.write(s"${sw.toString}\n") - w.write("```\n") - } - - // flush after every query so that results are saved in the event of the driver crashing - w.flush() - - } catch { - case e: Exception => - // we expect many generated queries to be invalid - invalidQueryCount += 1 - if (showFailedSparkQueries) { - QueryComparison.showSQL(w, sql) - w.write(s"Query failed in Spark: ${e.getMessage}\n") - } - } - }) - - w.write("# Summary\n") - w.write( - s"Total queries: $queryCount; Invalid queries: $invalidQueryCount; " + - s"Comet failed: $cometFailureCount; Comet succeeded: $cometSuccessCount\n") - - } finally { - w.close() - querySource.close() - } - } -} - -object QueryComparison { - def assertSameRows( - sparkRows: Array[Row], - cometRows: Array[Row], - output: BufferedWriter, - tolerance: Double = 0.000001): Boolean = { - if (sparkRows.length == cometRows.length) { - var i = 0 - while (i < sparkRows.length) { - val l = sparkRows(i) - val r = cometRows(i) - - // Check the schema is equal for first row only - if (i == 0 && l.schema != r.schema) { - output.write("[ERROR] Spark produced different schema than Comet.\n") - - return false - } - - assert(l.length == r.length) - for (j <- 0 until l.length) { - if (!same(l(j), r(j), tolerance)) { - output.write(s"First difference at row $i:\n") - output.write("Spark: `" + formatRow(l) + "`\n") - output.write("Comet: `" + formatRow(r) + "`\n") - i = sparkRows.length - - return false - } - } - i += 1 - } - } else { - output.write( - s"[ERROR] Spark produced ${sparkRows.length} rows and " + - s"Comet produced ${cometRows.length} rows.\n") - - return false - } - - true - } - - private def same(l: Any, r: Any, tolerance: Double): Boolean = { - if (l == null || r == null) { - return l == null && r == null - } - (l, r) match { - case (a: Float, b: Float) if a.isPosInfinity => b.isPosInfinity - case (a: Float, b: Float) if a.isNegInfinity => b.isNegInfinity - case (a: Float, b: Float) if a.isInfinity => b.isInfinity - case (a: Float, b: Float) if a.isNaN => b.isNaN - case (a: Float, b: Float) => (a - b).abs <= tolerance - case (a: Double, b: Double) if a.isPosInfinity => b.isPosInfinity - case (a: Double, b: Double) if a.isNegInfinity => b.isNegInfinity - case (a: Double, b: Double) if a.isInfinity => b.isInfinity - case (a: Double, b: Double) if a.isNaN => b.isNaN - case (a: Double, b: Double) => (a - b).abs <= tolerance - case (a: Array[_], b: Array[_]) => - a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance)) - case (a: mutable.WrappedArray[_], b: mutable.WrappedArray[_]) => - a.length == b.length && a.zip(b).forall(x => same(x._1, x._2, tolerance)) - case (a: Row, b: Row) => - val aa = a.toSeq - val bb = b.toSeq - aa.length == bb.length && aa.zip(bb).forall(x => same(x._1, x._2, tolerance)) - case (a, b) => a == b - } - } - - private def format(value: Any): String = { - value match { - case null => "NULL" - case v: mutable.WrappedArray[_] => s"[${v.map(format).mkString(",")}]" - case v: Array[Byte] => s"[${v.mkString(",")}]" - case r: Row => formatRow(r) - case other => other.toString - } - } - - private def formatRow(row: Row): String = { - row.toSeq.map(format).mkString(",") - } - - def showSQL(w: BufferedWriter, sql: String, maxLength: Int = 120): Unit = { - w.write("## SQL\n") - w.write("```\n") - val words = sql.split(" ") - val currentLine = new StringBuilder - for (word <- words) { - if (currentLine.length + word.length + 1 > maxLength) { - w.write(currentLine.toString.trim) - w.write("\n") - currentLine.setLength(0) - } - currentLine.append(word).append(" ") - } - if (currentLine.nonEmpty) { - w.write(currentLine.toString.trim) - w.write("\n") - } - w.write("```\n") - } - - def showPlans(w: BufferedWriter, sparkPlan: String, cometPlan: String): Unit = { - w.write("### Spark Plan\n") - w.write(s"```\n$sparkPlan\n```\n") - w.write("### Comet Plan\n") - w.write(s"```\n$cometPlan\n```\n") - } - - def showSchema(w: BufferedWriter, sparkSchema: String, cometSchema: String): Unit = { - w.write("### Spark Schema\n") - w.write(s"```\n$sparkSchema\n```\n") - w.write("### Comet Schema\n") - w.write(s"```\n$cometSchema\n```\n") - } -} diff --git a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala b/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala deleted file mode 100644 index 4d51c60e54..0000000000 --- a/fuzz-testing/src/main/scala/org/apache/comet/fuzz/Utils.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.fuzz - -import scala.util.Random - -object Utils { - - def randomChoice[T](list: Seq[T], r: Random): T = { - list(r.nextInt(list.length)) - } - - def randomWeightedChoice[T](valuesWithWeights: Seq[(T, Double)], r: Random): T = { - val totalWeight = valuesWithWeights.map(_._2).sum - val randomValue = r.nextDouble() * totalWeight - var cumulativeWeight = 0.0 - - for ((value, weight) <- valuesWithWeights) { - cumulativeWeight += weight - if (cumulativeWeight >= randomValue) { - return value - } - } - - // If for some reason the loop doesn't return, return the last value - valuesWithWeights.last._1 - } - -} diff --git a/kube/Dockerfile b/kube/Dockerfile index 699aeeb210..5c0e6d3b66 100644 --- a/kube/Dockerfile +++ b/kube/Dockerfile @@ -55,7 +55,6 @@ COPY mvnw /comet/mvnw COPY common /comet/common COPY dev /comet/dev COPY docs /comet/docs -COPY fuzz-testing /comet/fuzz-testing COPY spark /comet/spark COPY spark-integration /comet/spark-integration COPY scalafmt.conf /comet/scalafmt.conf diff --git a/pom.xml b/pom.xml index d1c81b8352..22c97ea1eb 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,6 @@ under the License. common spark spark-integration - fuzz-testing