Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,33 @@ impl SparkPhysicalExprAdapter {
let physical_type = cast.input_field().data_type();
let target_type = cast.target_field().data_type();

// Decimal-to-decimal scale-narrowing check.
// Reject reads where the read schema has a smaller scale than the
// file's, because Spark's Cast below would silently truncate
// fractional digits, producing wrong values. This matches the
// unconditionally-lossy case in issue #4089 (e.g. Decimal(10,2) read
// as Decimal(5,0)).
//
// Precision-only changes with the same scale (e.g. Decimal(5,2) read
// as Decimal(3,2)) are NOT rejected here: Spark 3.x's strict rule
// would reject them, but Spark 4.0's parquet-mr fallback path
// (PARQUET_VECTORIZED_READER_ENABLED=false) and the vectorized
// type-widening path produce null on per-value overflow, which
// DataFusion's cast already does in the adapting-schema path.
if let (DataType::Decimal128(_src_p, src_s), DataType::Decimal128(_dst_p, dst_s)) =
(physical_type, target_type)
{
if dst_s < src_s {
return Err(DataFusionError::Plan(format!(
"Parquet column cannot be converted. Column: [{}], \
Expected: {}, Found: {}",
cast.input_field().name(),
target_type,
physical_type,
)));
}
}

// For complex nested types (Struct, List, Map), Timestamp timezone
// mismatches, and Timestamp→Int64 (nanosAsLong), use CometCastColumnExpr
// with spark_parquet_convert which handles field-name-based selection,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,29 @@ abstract class ParquetReadSuite extends CometTestBase {
}
}

test("native_datafusion rejects incompatible decimal precision/scale") {
// Regression guard for https://github.com/apache/datafusion-comet/issues/4089.
// Reading Decimal(10,2) under a Decimal(5,0) read schema is unconditionally
// lossy: target precision is smaller than source precision and scales differ.
// Spark's vectorized reader throws SchemaColumnConvertNotSupportedException
// here on all versions. The native_datafusion scan must reject this in its
// schema adapter rather than letting Spark Cast silently rescale/truncate.
withSQLConf(
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { dir =>
val path = dir.getCanonicalPath
spark
.sql("select cast('123.45' as decimal(10,2)) as d " +
"union all select cast('67.89' as decimal(10,2))")
.write
.parquet(path)
val df = spark.read.schema("d decimal(5,0)").parquet(path)
assertThrows[SparkException](df.collect())
}
}
}

test("type widening: byte → short/int/long, short → int/long, int → long") {
withSQLConf(CometConf.COMET_SCHEMA_EVOLUTION_ENABLED.key -> "true") {
withTempPath { dir =>
Expand Down
Loading