From d474ba37571233db0aaf317291a5e4e6eab12b01 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:33:06 -0600 Subject: [PATCH 01/12] test: add ParquetSchemaMismatchSuite skeleton for issue #3720 --- .../parquet/ParquetSchemaMismatchSuite.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala new file mode 100644 index 0000000000..190f231d54 --- /dev/null +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.parquet + +import scala.util.Try + +import org.apache.hadoop.fs.Path +import org.apache.parquet.example.data.simple.SimpleGroup +import org.apache.parquet.schema.MessageTypeParser +import org.apache.spark.SparkException +import org.apache.spark.sql.{CometTestBase, DataFrame} +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.CometConf + +/** + * Documents Comet's behavior for the Parquet read-schema/file-schema mismatch cases tracked in + * https://github.com/apache/datafusion-comet/issues/3720. + * + * Each test exercises one case under one of the two Comet scan implementations + * (`native_datafusion`, `native_iceberg_compat`). Assertions encode Comet's actual current + * behavior. Spark's reference behavior is recorded in the per-case comments and in the matrix + * below; assertions do not run Spark in isolation. + * + * Behavior matrix (Spark reference behavior; Comet behavior is asserted by each test). "OK" = + * read succeeds. "throw" = SparkException at runtime. + * + * Case 3.4 3.5 4.0 + * 1. BINARY -> TIMESTAMP throw throw throw 2. INT32 -> INT64 throw throw OK (widening) 3. INT96 + * LTZ -> TIMESTAMP_NTZ throw throw throw 4. Decimal(10,2) -> Decimal(5,0) throw throw throw + * 5. INT32 -> INT64 with rowgroup filter throw throw OK 6. STRING -> INT throw throw throw + * 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw C1. INT8 -> INT32 OK OK OK C2. FLOAT -> + * DOUBLE OK OK OK + * + * If a Comet fix lands that aligns one of these cases with Spark, update the affected test(s) and + * this matrix in the same PR. + */ +class ParquetSchemaMismatchSuite extends CometTestBase { + import testImplicits._ + + /** + * Force a specific Comet scan implementation, force V1 datasource (both native_datafusion and + * native_iceberg_compat are V1-only), then run the given block in a fresh temp directory. The + * block writes Parquet under `path`, then reads it back with a mismatched schema. + */ + private def withMismatchedSchema(scanImpl: String)(body: String => Unit): Unit = { + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + body(dir.getCanonicalPath) + } + } + } + + /** Both scan implementations under test, used as a `foreach` driver. */ + private val scanImpls: Seq[String] = + Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT) +} From bee01e8fc026584d1fdd5f0f95309d0cb1985325 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:46:18 -0600 Subject: [PATCH 02/12] test: case 1 binary read as timestamp Both native_datafusion and native_iceberg_compat throw SparkException (matching Spark's reference behavior). The withMismatchedSchema helper was redesigned to accept a separate check lambda so collect() executes while the temp directory is still present. --- .../parquet/ParquetSchemaMismatchSuite.scala | 68 +++++++++++++++---- 1 file changed, 54 insertions(+), 14 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 190f231d54..651f6513a3 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -39,33 +39,40 @@ import org.apache.comet.CometConf * behavior. Spark's reference behavior is recorded in the per-case comments and in the matrix * below; assertions do not run Spark in isolation. * - * Behavior matrix (Spark reference behavior; Comet behavior is asserted by each test). "OK" = - * read succeeds. "throw" = SparkException at runtime. - * - * Case 3.4 3.5 4.0 - * 1. BINARY -> TIMESTAMP throw throw throw 2. INT32 -> INT64 throw throw OK (widening) 3. INT96 - * LTZ -> TIMESTAMP_NTZ throw throw throw 4. Decimal(10,2) -> Decimal(5,0) throw throw throw - * 5. INT32 -> INT64 with rowgroup filter throw throw OK 6. STRING -> INT throw throw throw - * 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw C1. INT8 -> INT32 OK OK OK C2. FLOAT -> - * DOUBLE OK OK OK - * * If a Comet fix lands that aligns one of these cases with Spark, update the affected test(s) and - * this matrix in the same PR. + * the matrix below in the same PR. */ +// Behavior matrix (Spark reference behavior; Comet behavior is asserted by each +// test). "OK" = read succeeds. "throw" = SparkException at runtime. +// +// Case 3.4 3.5 4.0 +// 1. BINARY -> TIMESTAMP throw throw throw +// 2. INT32 -> INT64 throw throw OK (widening) +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw +// 5. INT32 -> INT64 with rowgroup filter throw throw OK +// 6. STRING -> INT throw throw throw +// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw +// C1. INT8 -> INT32 OK OK OK +// C2. FLOAT -> DOUBLE OK OK OK class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ /** * Force a specific Comet scan implementation, force V1 datasource (both native_datafusion and * native_iceberg_compat are V1-only), then run the given block in a fresh temp directory. The - * block writes Parquet under `path`, then reads it back with a mismatched schema. + * block writes Parquet under `path`, builds a DataFrame with a mismatched schema, and runs + * assertions inside `check`. The temp directory (and its files) is present for the entire + * duration of `body`, so `collect()` and other actions may be called safely inside `check`. */ - private def withMismatchedSchema(scanImpl: String)(body: String => Unit): Unit = { + private def withMismatchedSchema(scanImpl: String)(body: String => DataFrame)( + check: DataFrame => Unit): Unit = { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl, SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { withTempPath { dir => - body(dir.getCanonicalPath) + val df = body(dir.getCanonicalPath) + check(df) } } } @@ -73,4 +80,37 @@ class ParquetSchemaMismatchSuite extends CometTestBase { /** Both scan implementations under test, used as a `foreach` driver. */ private val scanImpls: Seq[String] = Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT) + + // Case 1: BINARY read as TIMESTAMP. Spark throws SparkException on all + // versions. Both Comet scan implementations also throw: native_datafusion + // raises CometNativeException (column type mismatch); native_iceberg_compat + // raises SparkException (SchemaColumnConvertNotSupportedException). Both + // surface to the caller as SparkException. + scanImpls.foreach { scanImpl => + test(s"binary read as timestamp: $scanImpl") { + withMismatchedSchema(scanImpl) { path => + val schemaStr = + """message root { + | optional binary _1; + |} + """.stripMargin + val schema = MessageTypeParser.parseMessageType(schemaStr) + val writer = createParquetWriter(schema, new Path(path, "part-r-0.parquet")) + (0 until 10).foreach { i => + val record = new SimpleGroup(schema) + record.add(0, s"value-$i") + writer.write(record) + } + writer.close() + spark.read.schema("_1 timestamp").parquet(path) + } { df => + // Pattern 3 (throw): both scan implementations throw SparkException at + // collect time; the error message differs but the exception type is the + // same. Behavior matches Spark's reference behavior on all versions. + intercept[SparkException] { + df.collect() + } + } + } + } } From e19a5d227c344c283bc76af33fc8dca825c43174 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:49:53 -0600 Subject: [PATCH 03/12] test: case 2 int32 read as int64 --- .../parquet/ParquetSchemaMismatchSuite.scala | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 651f6513a3..b604c27f83 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -45,16 +45,16 @@ import org.apache.comet.CometConf // Behavior matrix (Spark reference behavior; Comet behavior is asserted by each // test). "OK" = read succeeds. "throw" = SparkException at runtime. // -// Case 3.4 3.5 4.0 -// 1. BINARY -> TIMESTAMP throw throw throw -// 2. INT32 -> INT64 throw throw OK (widening) -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw -// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw -// 5. INT32 -> INT64 with rowgroup filter throw throw OK -// 6. STRING -> INT throw throw throw -// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw -// C1. INT8 -> INT32 OK OK OK -// C2. FLOAT -> DOUBLE OK OK OK +// Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat +// 1. BINARY -> TIMESTAMP throw throw throw throw throw +// 2. INT32 -> INT64 throw throw OK OK (widened values) throw +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw ? ? +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw ? ? +// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? +// 6. STRING -> INT throw throw throw ? ? +// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? +// C1. INT8 -> INT32 OK OK OK ? ? +// C2. FLOAT -> DOUBLE OK OK OK ? ? class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -113,4 +113,32 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 2: INT32 read as INT64 (value-preserving widening). Spark 3.4/3.5 + // throw SparkException; Spark 4.0 allows widening. + // native_datafusion: succeeds with widened values (Pattern 1). + // native_iceberg_compat: throws SparkException (SchemaColumnConvertNotSupportedException + // from TypeUtil.checkParquetType); does not support INT32->INT64 widening. + test(s"int32 read as int64: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(1, 2, 3).toDF("c").write.parquet(path) + spark.read.schema("c bigint").parquet(path) + } { df => + // Pattern 1 (value-preserving widening). + checkAnswer(df, Seq(1L, 2L, 3L).map(org.apache.spark.sql.Row(_))) + } + } + + test(s"int32 read as int64: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(1, 2, 3).toDF("c").write.parquet(path) + spark.read.schema("c bigint").parquet(path) + } { df => + // Pattern 3 (throw): native_iceberg_compat rejects INT32->INT64 widening + // via TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). + intercept[SparkException] { + df.collect() + } + } + } } From 96390ac03fcbdd9e28fa587a5832da514ca6a3f6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:53:34 -0600 Subject: [PATCH 04/12] test: case 3 timestamp_ltz read as timestamp_ntz --- .../parquet/ParquetSchemaMismatchSuite.scala | 45 ++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index b604c27f83..547c11f60e 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -48,7 +48,7 @@ import org.apache.comet.CometConf // Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat // 1. BINARY -> TIMESTAMP throw throw throw throw throw // 2. INT32 -> INT64 throw throw OK OK (widened values) throw -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw ? ? +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw ? ? // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? // 6. STRING -> INT throw throw throw ? ? @@ -141,4 +141,47 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 3: INT96 TimestampLTZ read as TimestampNTZ. Spark throws on all + // versions (SPARK-36182). INT96 carries no timezone info in the Parquet + // schema, so native_datafusion cannot detect the LTZ -> NTZ mismatch and + // silently reads (possibly with a wrong wall-clock value). + // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ + // (mirrors Spark's behavior). + test(s"int96 timestamp_ltz read as timestamp_ntz: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + Seq(java.sql.Timestamp.valueOf("2020-01-01 00:00:00")) + .toDF("ts") + .write + .parquet(path) + } + spark.read.schema("ts timestamp_ntz").parquet(path) + } { df => + // native_datafusion succeeds silently: INT96 carries no timezone info so + // the LTZ -> NTZ mismatch is undetectable; result may have a wrong + // wall-clock value depending on the executor timezone. + val outcome = Try(df.collect()) + assert(outcome.isSuccess, s"unexpected failure: $outcome") + assert(outcome.get.length == 1) + } + } + + test(s"int96 timestamp_ltz read as timestamp_ntz: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + Seq(java.sql.Timestamp.valueOf("2020-01-01 00:00:00")) + .toDF("ts") + .write + .parquet(path) + } + spark.read.schema("ts timestamp_ntz").parquet(path) + } { df => + // native_iceberg_compat throws SparkException via + // TypeUtil.convertErrorForTimestampNTZ; matches Spark's behavior. + intercept[SparkException] { + df.collect() + } + } + } } From 1aa00df9b31a6f057b3f86d2b4be03d4df08c5ca Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:56:28 -0600 Subject: [PATCH 05/12] test: case 4 incompatible decimal precision/scale --- .../parquet/ParquetSchemaMismatchSuite.scala | 38 ++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 547c11f60e..e68e3ccd77 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -49,7 +49,7 @@ import org.apache.comet.CometConf // 1. BINARY -> TIMESTAMP throw throw throw throw throw // 2. INT32 -> INT64 throw throw OK OK (widened values) throw // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw -// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw ? ? +// 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? // 6. STRING -> INT throw throw throw ? ? // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? @@ -184,4 +184,40 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 4: Decimal(10,2) read as Decimal(5,0). Reading from a higher-precision + // decimal as a lower-precision decimal can lose data (123.45 cannot fit in + // decimal(5,0)). Spark throws on all versions (SPARK-34212). + test(s"decimal(10,2) read as decimal(5,0): ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(BigDecimal("123.45"), BigDecimal("67.89")) + .toDF("d") + .selectExpr("cast(d as decimal(10,2)) as d") + .write + .parquet(path) + spark.read.schema("d decimal(5,0)").parquet(path) + } { df => + // Pattern 3 (structural mismatch). Capture observed outcome. + val outcome = Try(df.collect()) + assert(outcome.isSuccess, s"unexpected failure: $outcome") + } + } + + test(s"decimal(10,2) read as decimal(5,0): ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(BigDecimal("123.45"), BigDecimal("67.89")) + .toDF("d") + .selectExpr("cast(d as decimal(10,2)) as d") + .write + .parquet(path) + spark.read.schema("d decimal(5,0)").parquet(path) + } { df => + // native_iceberg_compat throws SparkException via + // SchemaColumnConvertNotSupportedException (INT64 cannot convert to decimal(5,0)); + // matches Spark's reference behavior. + intercept[SparkException] { + df.collect() + } + } + } } From 1d139ee4d9e7e503536307e3b952ff9f5fe5ca2c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 11:58:45 -0600 Subject: [PATCH 06/12] test: case 5 int32 as int64 with row group filter --- .../parquet/ParquetSchemaMismatchSuite.scala | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index e68e3ccd77..8f974c151d 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -50,7 +50,7 @@ import org.apache.comet.CometConf // 2. INT32 -> INT64 throw throw OK OK (widened values) throw // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw -// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK ? ? +// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw // 6. STRING -> INT throw throw throw ? ? // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? // C1. INT8 -> INT32 OK OK OK ? ? @@ -220,4 +220,37 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 5: regression guard for row group skipping. Write INT32 values near + // INT32 max, read as INT64 with a filter whose constant exceeds INT32 max. + // If the scan treats the filter as INT32, row-group skipping might overflow + // and skip rows that should match. + test(s"int32 read as int64 with row group filter: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(Int.MaxValue - 2, Int.MaxValue - 1, Int.MaxValue).toDF("c").write.parquet(path) + spark.read + .schema("c bigint") + .parquet(path) + .filter(s"c > ${Int.MaxValue.toLong - 1L}") + } { df => + // Pattern 1: filter must not overflow when widened. + checkAnswer(df, Seq(Int.MaxValue.toLong).map(org.apache.spark.sql.Row(_))) + } + } + + test(s"int32 read as int64 with row group filter: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(Int.MaxValue - 2, Int.MaxValue - 1, Int.MaxValue).toDF("c").write.parquet(path) + spark.read + .schema("c bigint") + .parquet(path) + .filter(s"c > ${Int.MaxValue.toLong - 1L}") + } { df => + // native_iceberg_compat rejects INT32->INT64 widening (Case 2). The filter + // never runs. + intercept[SparkException] { + df.collect() + } + } + } } From 69b1457d84180be2200ae046693992a5c9cfa865 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:01:45 -0600 Subject: [PATCH 07/12] test: case 6 string read as int --- .../parquet/ParquetSchemaMismatchSuite.scala | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 8f974c151d..c00c93c53c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -51,7 +51,7 @@ import org.apache.comet.CometConf // 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw -// 6. STRING -> INT throw throw throw ? ? +// 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? // C1. INT8 -> INT32 OK OK OK ? ? // C2. FLOAT -> DOUBLE OK OK OK ? ? @@ -253,4 +253,36 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Case 6: STRING column read as INT. Spark's vectorized reader throws on all + // versions because BINARY (string) cannot be converted to INT32 at the + // physical Parquet level. + // native_datafusion: silently succeeds; reinterprets the BINARY bytes of each + // string as raw INT32 bytes (garbage values). Does NOT throw. + // native_iceberg_compat: throws SparkException (aligns with Spark). + test(s"string read as int: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq("a", "b", "c").toDF("c").write.parquet(path) + spark.read.schema("c int").parquet(path) + } { df => + // Pattern 2 (silent garbage): native_datafusion reinterprets string BINARY + // bytes as INT32 without throwing. Values are meaningless but the read + // succeeds with the expected row count. + val outcome = Try(df.collect()) + assert(outcome.isSuccess, s"unexpected failure: $outcome") + assert(outcome.get.length == 3) + } + } + + test(s"string read as int: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq("a", "b", "c").toDF("c").write.parquet(path) + spark.read.schema("c int").parquet(path) + } { df => + val outcome = Try(df.collect()) + assert( + outcome.isFailure && outcome.failed.get.isInstanceOf[SparkException], + s"expected SparkException, got: $outcome") + } + } } From b012e99a6a9df51e661907a84ed63107e37ac661 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:17:09 -0600 Subject: [PATCH 08/12] test: case 7 timestamp_ntz read as array --- .../parquet/ParquetSchemaMismatchSuite.scala | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index c00c93c53c..1108fa8775 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -285,4 +285,37 @@ class ParquetSchemaMismatchSuite extends CometTestBase { s"expected SparkException, got: $outcome") } } + + // Case 7: TIMESTAMP_NTZ column read as ARRAY. Spark throws on all + // versions (SPARK-45604) because the requested type is a list/group but the + // physical Parquet column is a scalar. + test(s"timestamp_ntz read as array: ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(java.time.LocalDateTime.parse("2020-01-01T00:00:00")) + .toDF("ts") + .write + .parquet(path) + spark.read.schema("ts array").parquet(path) + } { df => + val outcome = Try(df.collect()) + assert( + outcome.isFailure && outcome.failed.get.isInstanceOf[SparkException], + s"expected SparkException, got: $outcome") + } + } + + test(s"timestamp_ntz read as array: ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(java.time.LocalDateTime.parse("2020-01-01T00:00:00")) + .toDF("ts") + .write + .parquet(path) + spark.read.schema("ts array").parquet(path) + } { df => + val outcome = Try(df.collect()) + assert( + outcome.isFailure && outcome.failed.get.isInstanceOf[SparkException], + s"expected SparkException, got: $outcome") + } + } } From 318cac7f8c7a3471101b37bc96b8389b985cb38b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:17:27 -0600 Subject: [PATCH 09/12] test: update matrix row 7 with confirmed throw outcomes --- .../org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 1108fa8775..e4fee5ea1c 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -52,7 +52,7 @@ import org.apache.comet.CometConf // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw // 6. STRING -> INT throw throw throw OK (garbage values) throw -// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw ? ? +// 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK ? ? // C2. FLOAT -> DOUBLE OK OK OK ? ? class ParquetSchemaMismatchSuite extends CometTestBase { From 60a0ffa3106a4714f915821671ed39709b3c573c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:20:19 -0600 Subject: [PATCH 10/12] test: control case int8 read as int32 --- .../parquet/ParquetSchemaMismatchSuite.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index e4fee5ea1c..50d985abc0 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -53,7 +53,7 @@ import org.apache.comet.CometConf // 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw // 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw -// C1. INT8 -> INT32 OK OK OK ? ? +// C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) // C2. FLOAT -> DOUBLE OK OK OK ? ? class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -318,4 +318,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { s"expected SparkException, got: $outcome") } } + + // Control C1: INT8 -> INT32 widening. Allowed by Spark on all versions and + // expected to succeed in both Comet scan impls. + scanImpls.foreach { scanImpl => + test(s"int8 read as int32 (control): $scanImpl") { + withMismatchedSchema(scanImpl) { path => + Seq(1.toByte, 2.toByte, 3.toByte).toDF("c").write.parquet(path) + spark.read.schema("c int").parquet(path) + } { df => + checkAnswer(df, Seq(1, 2, 3).map(org.apache.spark.sql.Row(_))) + } + } + } } From eabfdef4d628f45974b2dd74ca9885e442b7cdb5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:23:07 -0600 Subject: [PATCH 11/12] test: control case float read as double --- .../parquet/ParquetSchemaMismatchSuite.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 50d985abc0..2f369fb3f5 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -54,7 +54,7 @@ import org.apache.comet.CometConf // 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) -// C2. FLOAT -> DOUBLE OK OK OK ? ? +// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw (diverges from Spark) class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -331,4 +331,33 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } } } + + // Control C2: FLOAT -> DOUBLE widening. Allowed by Spark on all versions. + // native_datafusion: succeeds with widened values (Pattern 1). + // native_iceberg_compat: throws SparkException via TypeUtil.checkParquetType + // (SchemaColumnConvertNotSupportedException); does not support FLOAT->DOUBLE + // widening. This is a divergence from Spark's reference behavior. + test(s"float read as double (control): ${CometConf.SCAN_NATIVE_DATAFUSION}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => + Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) + spark.read.schema("c double").parquet(path) + } { df => + // Float -> Double is exact for these magnitudes. + checkAnswer(df, Seq(1.0d, 2.0d, 3.0d).map(org.apache.spark.sql.Row(_))) + } + } + + test(s"float read as double (control): ${CometConf.SCAN_NATIVE_ICEBERG_COMPAT}") { + withMismatchedSchema(CometConf.SCAN_NATIVE_ICEBERG_COMPAT) { path => + Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) + spark.read.schema("c double").parquet(path) + } { df => + // native_iceberg_compat rejects FLOAT->DOUBLE widening via + // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). + // This diverges from Spark which allows this widening on all versions. + intercept[SparkException] { + df.collect() + } + } + } } From cd8e68be4138d0105b6d036e8261fd0655f7945d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 12:42:14 -0600 Subject: [PATCH 12/12] test: handle Spark 4.0 type widening in iceberg-compat assertions On Spark 4.0, COMET_SCHEMA_EVOLUTION_ENABLED defaults to true and TypeUtil.checkParquetType has an isSpark40Plus guard, so four native_iceberg_compat tests that previously expected SparkException now succeed with widened values. Make each assertion version-conditional using CometSparkSessionExtensions.isSpark40Plus and update the behavior matrix accordingly. --- .../parquet/ParquetSchemaMismatchSuite.scala | 83 +++++++++++++------ 1 file changed, 56 insertions(+), 27 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala index 2f369fb3f5..70038936ab 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetSchemaMismatchSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.{CometTestBase, DataFrame} import org.apache.spark.sql.internal.SQLConf import org.apache.comet.CometConf +import org.apache.comet.CometSparkSessionExtensions /** * Documents Comet's behavior for the Parquet read-schema/file-schema mismatch cases tracked in @@ -47,14 +48,14 @@ import org.apache.comet.CometConf // // Case Spark 3.4 3.5 4.0 Comet native_datafusion Comet native_iceberg_compat // 1. BINARY -> TIMESTAMP throw throw throw throw throw -// 2. INT32 -> INT64 throw throw OK OK (widened values) throw -// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw +// 2. INT32 -> INT64 throw throw OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) +// 3. INT96 LTZ -> TIMESTAMP_NTZ throw throw throw OK (silent, possible wall-clock diff) throw on 3.x / OK on 4.0 (isSpark40Plus guard in TypeUtil) // 4. Decimal(10,2) -> Decimal(5,0) throw throw throw OK (reads, values unverified) throw -// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw +// 5. INT32 -> INT64 w/ rowgroup filter throw throw OK OK (1 row, no overflow) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) // 6. STRING -> INT throw throw throw OK (garbage values) throw // 7. TIMESTAMP_NTZ -> ARRAY<...> throw throw throw throw throw // C1. INT8 -> INT32 OK OK OK OK (widened values) OK (widened values) -// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw (diverges from Spark) +// C2. FLOAT -> DOUBLE OK OK OK OK (widened values) throw on 3.x / OK on 4.0 (COMET_SCHEMA_EVOLUTION_ENABLED defaults true) class ParquetSchemaMismatchSuite extends CometTestBase { import testImplicits._ @@ -117,8 +118,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // Case 2: INT32 read as INT64 (value-preserving widening). Spark 3.4/3.5 // throw SparkException; Spark 4.0 allows widening. // native_datafusion: succeeds with widened values (Pattern 1). - // native_iceberg_compat: throws SparkException (SchemaColumnConvertNotSupportedException - // from TypeUtil.checkParquetType); does not support INT32->INT64 widening. + // native_iceberg_compat: throws SparkException on Spark 3.x (SchemaColumnConvertNotSupportedException + // from TypeUtil.checkParquetType); on Spark 4.0 COMET_SCHEMA_EVOLUTION_ENABLED defaults to true + // so the widening is allowed and succeeds with widened values. test(s"int32 read as int64: ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => Seq(1, 2, 3).toDF("c").write.parquet(path) @@ -134,10 +136,16 @@ class ParquetSchemaMismatchSuite extends CometTestBase { Seq(1, 2, 3).toDF("c").write.parquet(path) spark.read.schema("c bigint").parquet(path) } { df => - // Pattern 3 (throw): native_iceberg_compat rejects INT32->INT64 widening - // via TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). - intercept[SparkException] { - df.collect() + // On Spark 3.x: native_iceberg_compat rejects INT32->INT64 widening via + // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). + // On Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true so widening + // is allowed and succeeds with correctly widened values. + if (CometSparkSessionExtensions.isSpark40Plus) { + checkAnswer(df, Seq(1L, 2L, 3L).map(org.apache.spark.sql.Row(_))) + } else { + intercept[SparkException] { + df.collect() + } } } } @@ -146,8 +154,9 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // versions (SPARK-36182). INT96 carries no timezone info in the Parquet // schema, so native_datafusion cannot detect the LTZ -> NTZ mismatch and // silently reads (possibly with a wrong wall-clock value). - // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ - // (mirrors Spark's behavior). + // native_iceberg_compat throws via TypeUtil.convertErrorForTimestampNTZ on + // Spark 3.x (mirrors Spark's behavior). On Spark 4.0, TypeUtil.checkParquetType + // has an isSpark40Plus guard that bypasses the INT96 check, so the read succeeds. test(s"int96 timestamp_ltz read as timestamp_ntz: ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { @@ -177,10 +186,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { } spark.read.schema("ts timestamp_ntz").parquet(path) } { df => - // native_iceberg_compat throws SparkException via + // On Spark 3.x: native_iceberg_compat throws SparkException via // TypeUtil.convertErrorForTimestampNTZ; matches Spark's behavior. - intercept[SparkException] { - df.collect() + // On Spark 4.0: isSpark40Plus guard in TypeUtil.checkParquetType bypasses + // the INT96 check so the read succeeds silently (row count verified only; + // the wall-clock value may differ due to LTZ->NTZ reinterpretation). + if (CometSparkSessionExtensions.isSpark40Plus) { + assert(df.collect().length == 1) + } else { + intercept[SparkException] { + df.collect() + } } } } @@ -246,10 +262,16 @@ class ParquetSchemaMismatchSuite extends CometTestBase { .parquet(path) .filter(s"c > ${Int.MaxValue.toLong - 1L}") } { df => - // native_iceberg_compat rejects INT32->INT64 widening (Case 2). The filter - // never runs. - intercept[SparkException] { - df.collect() + // On Spark 3.x: native_iceberg_compat rejects INT32->INT64 widening (Case 2) + // so the filter never runs and a SparkException is thrown. + // On Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true so widening + // is allowed; the filter runs correctly without overflow and returns 1 row. + if (CometSparkSessionExtensions.isSpark40Plus) { + checkAnswer(df, Seq(Int.MaxValue.toLong).map(org.apache.spark.sql.Row(_))) + } else { + intercept[SparkException] { + df.collect() + } } } } @@ -334,9 +356,10 @@ class ParquetSchemaMismatchSuite extends CometTestBase { // Control C2: FLOAT -> DOUBLE widening. Allowed by Spark on all versions. // native_datafusion: succeeds with widened values (Pattern 1). - // native_iceberg_compat: throws SparkException via TypeUtil.checkParquetType - // (SchemaColumnConvertNotSupportedException); does not support FLOAT->DOUBLE - // widening. This is a divergence from Spark's reference behavior. + // native_iceberg_compat on Spark 3.x: throws SparkException via TypeUtil.checkParquetType + // (SchemaColumnConvertNotSupportedException); diverges from Spark's reference behavior. + // native_iceberg_compat on Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true + // so the widening is allowed and succeeds with widened values (matches Spark). test(s"float read as double (control): ${CometConf.SCAN_NATIVE_DATAFUSION}") { withMismatchedSchema(CometConf.SCAN_NATIVE_DATAFUSION) { path => Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) @@ -352,11 +375,17 @@ class ParquetSchemaMismatchSuite extends CometTestBase { Seq(1.0f, 2.0f, 3.0f).toDF("c").write.parquet(path) spark.read.schema("c double").parquet(path) } { df => - // native_iceberg_compat rejects FLOAT->DOUBLE widening via - // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException). - // This diverges from Spark which allows this widening on all versions. - intercept[SparkException] { - df.collect() + // On Spark 3.x: native_iceberg_compat rejects FLOAT->DOUBLE widening via + // TypeUtil.checkParquetType (SchemaColumnConvertNotSupportedException); + // diverges from Spark which allows this widening on all versions. + // On Spark 4.0: COMET_SCHEMA_EVOLUTION_ENABLED defaults to true so widening + // is allowed and succeeds with correctly widened values (matches Spark). + if (CometSparkSessionExtensions.isSpark40Plus) { + checkAnswer(df, Seq(1.0d, 2.0d, 3.0d).map(org.apache.spark.sql.Row(_))) + } else { + intercept[SparkException] { + df.collect() + } } } }