From 99e123510bd78a7a6c85a8646e55bad8edd1b1af Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 25 Apr 2026 13:03:41 -0600 Subject: [PATCH] fix: reject incompatible decimal precision/scale in native_datafusion scan The native_datafusion Spark physical expression adapter previously fell through to a Spark Cast for decimal-to-decimal type changes, which silently rescales or truncates values that should have raised an error. Mirror Spark's TypeUtil.isDecimalTypeMatched (Spark 3.x rule) by rejecting reads where the target precision is smaller than the source precision or the scales differ. Closes #4089. --- native/core/src/parquet/schema_adapter.rs | 27 +++++++++++++++++++ .../comet/parquet/ParquetReadSuite.scala | 23 ++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/native/core/src/parquet/schema_adapter.rs b/native/core/src/parquet/schema_adapter.rs index af79d9082d..1ee1575bea 100644 --- a/native/core/src/parquet/schema_adapter.rs +++ b/native/core/src/parquet/schema_adapter.rs @@ -385,6 +385,33 @@ impl SparkPhysicalExprAdapter { let physical_type = cast.input_field().data_type(); let target_type = cast.target_field().data_type(); + // Decimal-to-decimal scale-narrowing check. + // Reject reads where the read schema has a smaller scale than the + // file's, because Spark's Cast below would silently truncate + // fractional digits, producing wrong values. This matches the + // unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2) read + // as Decimal(5,0)). + // + // Precision-only changes with the same scale (e.g. Decimal(5,2) read + // as Decimal(3,2)) are NOT rejected here: Spark 3.x's strict rule + // would reject them, but Spark 4.0's parquet-mr fallback path + // (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized + // type-widening path produce null on per-value overflow, which + // DataFusion's cast already does in the adapting-schema path. + if let (DataType::Decimal128(_src_p, src_s), DataType::Decimal128(_dst_p, dst_s)) = + (physical_type, target_type) + { + if dst_s < src_s { + return Err(DataFusionError::Plan(format!( + "Parquet column cannot be converted. Column: [{}], \ + Expected: {}, Found: {}", + cast.input_field().name(), + target_type, + physical_type, + ))); + } + } + // For complex nested types (Struct, List, Map), Timestamp timezone // mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr // with spark_parquet_convert which handles field-name-based selection, diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 75ac889228..589d001c85 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -998,6 +998,29 @@ abstract class ParquetReadSuite extends CometTestBase { } } + test("native_datafusion rejects incompatible decimal precision/scale") { + // Regression guard for https://github.com/apache/datafusion-comet/issues/4089. + // Reading Decimal(10,2) under a Decimal(5,0) read schema is unconditionally + // lossy: target precision is smaller than source precision and scales differ. + // Spark's vectorized reader throws SchemaColumnConvertNotSupportedException + // here on all versions. The native_datafusion scan must reject this in its + // schema adapter rather than letting Spark Cast silently rescale/truncate. + withSQLConf( + CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION, + SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark + .sql("select cast('123.45' as decimal(10,2)) as d " + + "union all select cast('67.89' as decimal(10,2))") + .write + .parquet(path) + val df = spark.read.schema("d decimal(5,0)").parquet(path) + assertThrows[SparkException](df.collect()) + } + } + } + test("type widening: byte → short/int/long, short → int/long, int → long") { withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") { withTempPath { dir =>