From 96407464a9c4df87155e692b1bfcefe0c5de9210 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Tue, 26 May 2026 15:57:06 +0800 Subject: [PATCH] fix(parquet): fix parquet writer metrics conversion --- src/iceberg/file_writer.h | 3 + src/iceberg/metrics.h | 2 +- src/iceberg/metrics_config.cc | 27 +- src/iceberg/metrics_config.h | 10 +- src/iceberg/parquet/parquet_metrics.cc | 272 ++++++++++++++---- ...t_metrics.h => parquet_metrics_internal.h} | 5 +- src/iceberg/parquet/parquet_writer.cc | 28 +- src/iceberg/test/metrics_test_base.cc | 47 ++- src/iceberg/test/metrics_test_base.h | 4 + src/iceberg/test/parquet_metrics_test.cc | 97 ++++++- src/iceberg/util/truncate_util.cc | 14 +- 11 files changed, 387 insertions(+), 122 deletions(-) rename src/iceberg/parquet/{parquet_metrics.h => parquet_metrics_internal.h} (95%) diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index 3c890453b..4b1045a40 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -60,6 +60,9 @@ class ICEBERG_EXPORT WriterProperties : public ConfigBase { "zstd"}; inline static Entry kParquetCompressionLevel{ "write.parquet.compression-level", ""}; + /// \brief Maximum number of rows in each Parquet row group. + inline static Entry kParquetMaxRowGroupRows{"write.parquet.max-row-group-rows", + 1024 * 1024}; /// TODO(gangwu): add table properties with write.avro|parquet|orc.* diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h index 083cd0410..8cfbbdf8b 100644 --- a/src/iceberg/metrics.h +++ b/src/iceberg/metrics.h @@ -36,7 +36,7 @@ namespace iceberg { /// lower/upper bounds for a specific field identified by its field_id. struct ICEBERG_EXPORT FieldMetrics { /// \brief The field ID this metrics belongs to. - int32_t field_id; + int32_t field_id = -1; /// \brief The total number of values (including nulls) for this field. /// A negative value indicates the count is unknown. diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc index ea20d47e1..95d1a1fe1 100644 --- a/src/iceberg/metrics_config.cc +++ b/src/iceberg/metrics_config.cc @@ -125,9 +125,8 @@ const std::shared_ptr& MetricsConfig::Default() { Result> MetricsConfig::Make(const Table& table) { ICEBERG_ASSIGN_OR_RAISE(auto schema, table.schema()); - auto sort_order = table.sort_order(); - return MakeInternal(table.properties(), *schema, - *sort_order.value_or(SortOrder::Unsorted())); + auto order = table.sort_order().value_or(SortOrder::Unsorted()); + return MakeInternal(table.properties(), schema.get(), order.get()); } Result> MetricsConfig::Make( @@ -135,11 +134,11 @@ Result> MetricsConfig::Make( // Create a minimal TableProperties wrapper for the properties TableProperties props = TableProperties::FromMap(std::move(properties)); - return MakeInternal(props, Schema({}), *SortOrder::Unsorted()); + return MakeInternal(props, /*schema=*/nullptr, /*order=*/nullptr); } Result> MetricsConfig::MakeInternal( - const TableProperties& props, const Schema& schema, const SortOrder& order) { + const TableProperties& props, const Schema* schema, const SortOrder* order) { ColumnModeMap column_modes; MetricsMode default_mode = kDefaultMetricsMode; @@ -148,16 +147,16 @@ Result> MetricsConfig::MakeInternal( props.Get(TableProperties::kDefaultWriteMetricsMode); ICEBERG_ASSIGN_OR_RAISE(default_mode, ParseMode(configured_metrics_mode, kDefaultMetricsMode)); - } else { + } else if (schema != nullptr) { int32_t max_inferred_columns = MaxInferredColumns(props); GetProjectedIdsVisitor visitor(/*include_struct_ids=*/true); - ICEBERG_RETURN_UNEXPECTED(visitor.Visit(schema)); + ICEBERG_RETURN_UNEXPECTED(visitor.Visit(*schema)); auto projected_columns = static_cast(visitor.Finish().size()); if (max_inferred_columns < projected_columns) { ICEBERG_ASSIGN_OR_RAISE(auto limit_field_ids, - LimitFieldIds(schema, max_inferred_columns)); + LimitFieldIds(*schema, max_inferred_columns)); for (auto id : limit_field_ids) { - ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema.FindColumnNameById(id)); + ICEBERG_ASSIGN_OR_RAISE(auto column_name, schema->FindColumnNameById(id)); ICEBERG_CHECK(column_name.has_value(), "Field id {} not found in schema", id); column_modes[std::string(column_name.value())] = kDefaultMetricsMode; } @@ -167,10 +166,12 @@ Result> MetricsConfig::MakeInternal( } // First set sorted column with sorted column default (can be overridden by user) - auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode); - auto sorted_columns = SortOrder::OrderPreservingSortedColumns(schema, order); - for (const auto& sorted_column : sorted_columns) { - column_modes[std::string(sorted_column)] = sorted_col_default_mode; + if (schema != nullptr && order != nullptr) { + auto sorted_col_default_mode = SortedColumnDefaultMode(default_mode); + auto sorted_columns = SortOrder::OrderPreservingSortedColumns(*schema, *order); + for (const auto& sorted_column : sorted_columns) { + column_modes[std::string(sorted_column)] = sorted_col_default_mode; + } } // Handle user overrides of defaults diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h index a5e51ee6c..bba7307d3 100644 --- a/src/iceberg/metrics_config.h +++ b/src/iceberg/metrics_config.h @@ -101,12 +101,14 @@ class ICEBERG_EXPORT MetricsConfig { /// /// \param props will be read for metrics overrides (write.metadata.metrics.column.*) /// and default(write.metadata.metrics.default) - /// \param schema table schema - /// \param order sort order columns, will be promoted to truncate(16) + /// \param schema table schema, or nullptr when only properties are available + /// \param order table sort order, or nullptr when unavailable. If provided, sorted + /// columns use at least the default truncate metrics mode (`truncate(16)`) when + /// the default mode is `none` or `counts`; explicit column overrides still win. /// \return metrics configuration static Result> MakeInternal(const TableProperties& props, - const Schema& schema, - const SortOrder& order); + const Schema* schema, + const SortOrder* order); ColumnModeMap column_modes_; MetricsMode default_mode_; diff --git a/src/iceberg/parquet/parquet_metrics.cc b/src/iceberg/parquet/parquet_metrics.cc index 09b727a1a..32dc9fec8 100644 --- a/src/iceberg/parquet/parquet_metrics.cc +++ b/src/iceberg/parquet/parquet_metrics.cc @@ -17,13 +17,15 @@ * under the License. */ -#include "iceberg/parquet/parquet_metrics.h" - +#include #include #include #include +#include #include #include +#include +#include #include #include @@ -31,12 +33,14 @@ #include #include "iceberg/expression/literal.h" +#include "iceberg/parquet/parquet_metrics_internal.h" #include "iceberg/result.h" #include "iceberg/schema.h" #include "iceberg/type.h" #include "iceberg/util/checked_cast.h" -#include "iceberg/util/conversions.h" +#include "iceberg/util/decimal.h" #include "iceberg/util/truncate_util.h" +#include "iceberg/util/uuid.h" #include "iceberg/util/visit_type.h" namespace iceberg::parquet { @@ -67,6 +71,163 @@ std::optional FindColumnIndex(const ::parquet::SchemaDescriptor& parque return it != columns.end() ? std::optional(*it) : std::nullopt; } +int64_t CollectColumnSize(const ::parquet::FileMetaData& metadata, int32_t column_idx) { + int64_t size = 0; + for (int rg = 0; rg < metadata.num_row_groups(); ++rg) { + size += metadata.RowGroup(rg)->ColumnChunk(column_idx)->total_compressed_size(); + } + return size; +} + +template +Result TypedStatsLiteral(const ::parquet::Statistics& stats, bool is_min, + Converter&& converter) { + const auto& typed_stats = internal::checked_cast(stats); + return converter(is_min ? typed_stats.min() : typed_stats.max()); +} + +std::vector BytesFromByteArray(const ::parquet::ByteArray& value) { + return std::vector{value.ptr, value.ptr + value.len}; +} + +std::vector BytesFromFLBA(const ::parquet::FixedLenByteArray& value, + int32_t length) { + return std::vector{value.ptr, value.ptr + length}; +} + +Literal DecimalLiteral(int128_t value, const PrimitiveType& iceberg_type) { + const auto& decimal_type = internal::checked_cast(iceberg_type); + return Literal::Decimal(value, decimal_type.precision(), decimal_type.scale()); +} + +Result DecimalLiteralFromBytes(std::span bytes, + const PrimitiveType& iceberg_type) { + ICEBERG_ASSIGN_OR_RAISE(auto decimal, + Decimal::FromBigEndian(bytes.data(), bytes.size())); + return DecimalLiteral(decimal.value(), iceberg_type); +} + +Result BinaryStatsLiteral(std::vector bytes, + const PrimitiveType& iceberg_type) { + switch (iceberg_type.type_id()) { + case TypeId::kString: + return Literal::String(std::string(bytes.begin(), bytes.end())); + case TypeId::kBinary: + return Literal::Binary(std::move(bytes)); + case TypeId::kFixed: + return Literal::Fixed(std::move(bytes)); + case TypeId::kUuid: { + ICEBERG_ASSIGN_OR_RAISE(auto uuid, Uuid::FromBytes(bytes)); + return Literal::UUID(std::move(uuid)); + } + case TypeId::kDecimal: + return DecimalLiteralFromBytes(bytes, iceberg_type); + default: + return InvalidArgument( + "Cannot convert Parquet binary statistics to Iceberg type {}", + iceberg_type.ToString()); + } +} + +Result Int32StatsLiteral(int32_t value, const PrimitiveType& iceberg_type) { + switch (iceberg_type.type_id()) { + case TypeId::kInt: + return Literal::Int(value); + case TypeId::kLong: + return Literal::Long(value); + case TypeId::kDate: + return Literal::Date(value); + case TypeId::kDecimal: + return DecimalLiteral(value, iceberg_type); + default: + return InvalidArgument("Cannot convert Parquet INT32 statistics to Iceberg type {}", + iceberg_type.ToString()); + } +} + +Result Int64StatsLiteral(int64_t value, const PrimitiveType& iceberg_type) { + switch (iceberg_type.type_id()) { + case TypeId::kLong: + return Literal::Long(value); + case TypeId::kTime: + return Literal::Time(value); + case TypeId::kTimestamp: + return Literal::Timestamp(value); + case TypeId::kTimestampTz: + return Literal::TimestampTz(value); + case TypeId::kTimestampNs: + return Literal::TimestampNs(value); + case TypeId::kTimestampTzNs: + return Literal::TimestampTzNs(value); + case TypeId::kDecimal: + return DecimalLiteral(value, iceberg_type); + default: + return InvalidArgument("Cannot convert Parquet INT64 statistics to Iceberg type {}", + iceberg_type.ToString()); + } +} + +Result FloatStatsLiteral(float value, const PrimitiveType& iceberg_type) { + switch (iceberg_type.type_id()) { + case TypeId::kFloat: + return Literal::Float(value); + case TypeId::kDouble: + return Literal::Double(value); + default: + return InvalidArgument("Cannot convert Parquet FLOAT statistics to Iceberg type {}", + iceberg_type.ToString()); + } +} + +bool IsFloatingType(const PrimitiveType& type) { + return type.type_id() == TypeId::kFloat || type.type_id() == TypeId::kDouble; +} + +bool NeedsBoundTruncation(const PrimitiveType& type) { + return type.type_id() == TypeId::kString || type.type_id() == TypeId::kBinary; +} + +Result StatsValueToLiteral(const ::parquet::ColumnDescriptor& column, + const PrimitiveType& iceberg_type, + const ::parquet::Statistics& stats, bool is_min) { + switch (column.physical_type()) { + case ::parquet::Type::BOOLEAN: + return TypedStatsLiteral<::parquet::BoolStatistics>( + stats, is_min, [](bool value) { return Literal::Boolean(value); }); + case ::parquet::Type::INT32: + return TypedStatsLiteral<::parquet::Int32Statistics>( + stats, is_min, + [&](int32_t value) { return Int32StatsLiteral(value, iceberg_type); }); + case ::parquet::Type::INT64: + return TypedStatsLiteral<::parquet::Int64Statistics>( + stats, is_min, + [&](int64_t value) { return Int64StatsLiteral(value, iceberg_type); }); + case ::parquet::Type::FLOAT: + return TypedStatsLiteral<::parquet::FloatStatistics>( + stats, is_min, + [&](float value) { return FloatStatsLiteral(value, iceberg_type); }); + case ::parquet::Type::DOUBLE: + return TypedStatsLiteral<::parquet::DoubleStatistics>( + stats, is_min, [](double value) { return Literal::Double(value); }); + case ::parquet::Type::BYTE_ARRAY: + return TypedStatsLiteral<::parquet::ByteArrayStatistics>( + stats, is_min, [&](const ::parquet::ByteArray& value) { + return BinaryStatsLiteral(BytesFromByteArray(value), iceberg_type); + }); + case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: + return TypedStatsLiteral<::parquet::FLBAStatistics>( + stats, is_min, [&](const ::parquet::FixedLenByteArray& value) { + return BinaryStatsLiteral(BytesFromFLBA(value, column.type_length()), + iceberg_type); + }); + case ::parquet::Type::INT96: + case ::parquet::Type::UNDEFINED: + return NotSupported("Cannot convert Parquet statistics for physical type {}", + static_cast(column.physical_type())); + } + std::unreachable(); +} + /// \brief Collect counts (value count and null count) from footer statistics. /// \param field_id The Iceberg field ID. /// \param metadata The Parquet file metadata. @@ -105,6 +266,7 @@ Result> CollectBounds( int32_t field_id, std::shared_ptr iceberg_type, const ::parquet::FileMetaData& metadata, int32_t column_idx, int32_t truncate_length) { + auto column_desc = metadata.schema()->Column(column_idx); int64_t null_count = 0; int64_t value_count = 0; std::optional lower_bound; @@ -122,28 +284,24 @@ Result> CollectBounds( value_count += column_chunk->num_values(); if (stats->HasMinMax()) { - auto min_bytes = stats->EncodeMin(); - auto min_span = std::span( - reinterpret_cast(min_bytes.data()), min_bytes.size()); ICEBERG_ASSIGN_OR_RAISE(auto min_value, - Conversions::FromBytes(iceberg_type, min_span)); + StatsValueToLiteral(*column_desc, *iceberg_type, *stats, + /*is_min=*/true)); if (!lower_bound.has_value() || min_value < lower_bound.value()) { lower_bound = std::move(min_value); } - auto max_bytes = stats->EncodeMax(); - auto max_span = std::span( - reinterpret_cast(max_bytes.data()), max_bytes.size()); ICEBERG_ASSIGN_OR_RAISE(auto max_value, - Conversions::FromBytes(iceberg_type, max_span)); + StatsValueToLiteral(*column_desc, *iceberg_type, *stats, + /*is_min=*/false)); if (!upper_bound.has_value() || max_value > upper_bound.value()) { upper_bound = std::move(max_value); } } } - if (!lower_bound.has_value() || !upper_bound.has_value() || lower_bound->IsNaN() || - upper_bound->IsNaN()) { + if (!lower_bound.has_value() || !upper_bound.has_value() || + (IsFloatingType(*iceberg_type) && (lower_bound->IsNaN() || upper_bound->IsNaN()))) { return FieldMetrics{ .field_id = field_id, .value_count = value_count, @@ -151,19 +309,21 @@ Result> CollectBounds( }; } - ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower, - TruncateUtils::TruncateLowerBound( - *iceberg_type, lower_bound.value(), truncate_length)); - ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper, - TruncateUtils::TruncateUpperBound( - *iceberg_type, upper_bound.value(), truncate_length)); + if (NeedsBoundTruncation(*iceberg_type)) { + ICEBERG_ASSIGN_OR_RAISE( + lower_bound, TruncateUtils::TruncateLowerBound(*iceberg_type, lower_bound.value(), + truncate_length)); + ICEBERG_ASSIGN_OR_RAISE( + upper_bound, TruncateUtils::TruncateUpperBound(*iceberg_type, upper_bound.value(), + truncate_length)); + } return FieldMetrics{ .field_id = field_id, .value_count = value_count, .null_value_count = null_count, - .lower_bound = std::move(truncated_lower), - .upper_bound = std::move(truncated_upper), + .lower_bound = std::move(lower_bound), + .upper_bound = std::move(upper_bound), }; } @@ -187,19 +347,27 @@ Result> MetricsFromFieldMetrics( .null_value_count = fm.null_value_count, .nan_value_count = fm.nan_value_count}; - if (truncate_length > 0) { - if (fm.lower_bound.has_value()) { - ICEBERG_ASSIGN_OR_RAISE( - auto lower, TruncateUtils::TruncateLowerBound( - *primitive_type, fm.lower_bound.value(), truncate_length)); - result.lower_bound = std::move(lower); - } - if (fm.upper_bound.has_value()) { - ICEBERG_ASSIGN_OR_RAISE( - auto upper, TruncateUtils::TruncateUpperBound( - *primitive_type, fm.upper_bound.value(), truncate_length)); - result.upper_bound = std::move(upper); - } + if (truncate_length <= 0) { + return result; + } + + if (!NeedsBoundTruncation(*primitive_type)) { + result.lower_bound = fm.lower_bound; + result.upper_bound = fm.upper_bound; + return result; + } + + if (fm.lower_bound.has_value()) { + ICEBERG_ASSIGN_OR_RAISE( + result.lower_bound, + TruncateUtils::TruncateLowerBound(*primitive_type, fm.lower_bound.value(), + truncate_length)); + } + if (fm.upper_bound.has_value()) { + ICEBERG_ASSIGN_OR_RAISE( + result.upper_bound, + TruncateUtils::TruncateUpperBound(*primitive_type, fm.upper_bound.value(), + truncate_length)); } return result; @@ -279,6 +447,10 @@ class CollectMetricsVisitor { int32_t truncate_length = mode.TruncateLength(); const auto& primitive_type = internal::checked_pointer_cast(field.type()); + auto column_idx = FindColumnIndex(parquet_schema_, field_id); + if (column_idx.has_value()) { + metrics_.column_sizes[field_id] = CollectColumnSize(metadata_, column_idx.value()); + } ICEBERG_ASSIGN_OR_RAISE(auto field_metrics, MetricsFromFieldMetrics(field_id, field_metrics_, @@ -330,36 +502,10 @@ Result ParquetMetrics::GetMetrics( const std::unordered_map& field_metrics) { Metrics metrics; - // Collect row count and column sizes - int64_t row_count = 0; - for (int rg = 0; rg < metadata.num_row_groups(); ++rg) { - auto row_group = metadata.RowGroup(rg); - row_count += row_group->num_rows(); - for (int col = 0; col < row_group->num_columns(); ++col) { - auto column_chunk = row_group->ColumnChunk(col); - auto field_id_opt = GetFieldId(*parquet_schema.Column(col)); - if (!field_id_opt.has_value()) { - continue; - } - int32_t field_id = field_id_opt.value(); - - ICEBERG_ASSIGN_OR_RAISE(auto field_name, schema.FindColumnNameById(field_id)); - if (!field_name.has_value()) { - continue; - } - - MetricsMode mode = metrics_config.ColumnMode(field_name.value()); - if (mode.kind != MetricsMode::Kind::kNone) { - metrics.column_sizes[field_id] = - metrics.column_sizes.contains(field_id) - ? metrics.column_sizes[field_id] + column_chunk->total_compressed_size() - : column_chunk->total_compressed_size(); - } - } - } - metrics.row_count = row_count; + metrics.row_count = metadata.num_rows(); - // Collect metrics for all primitive fields + // Apply MetricsConfig while visiting schema fields, then collect footer metrics only + // for fields whose mode is not `none`. CollectMetricsVisitor visitor(parquet_schema, metrics_config, metadata, field_metrics, metrics); ICEBERG_RETURN_UNEXPECTED(visitor.VisitStruct(schema, "")); diff --git a/src/iceberg/parquet/parquet_metrics.h b/src/iceberg/parquet/parquet_metrics_internal.h similarity index 95% rename from src/iceberg/parquet/parquet_metrics.h rename to src/iceberg/parquet/parquet_metrics_internal.h index eb916241c..c78c114ab 100644 --- a/src/iceberg/parquet/parquet_metrics.h +++ b/src/iceberg/parquet/parquet_metrics_internal.h @@ -19,14 +19,13 @@ #pragma once -/// \file iceberg/parquet/parquet_metrics.h +/// \file iceberg/parquet/parquet_metrics_internal.h /// \brief Utilities for extracting metrics from Parquet files. #include #include -#include "iceberg/iceberg_bundle_export.h" #include "iceberg/metrics.h" #include "iceberg/metrics_config.h" #include "iceberg/result.h" @@ -35,7 +34,7 @@ namespace iceberg::parquet { /// \brief Utility class for computing metrics from Parquet files. -class ICEBERG_BUNDLE_EXPORT ParquetMetrics { +class ParquetMetrics { public: ParquetMetrics() = delete; diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index e91a2a6cf..da794cc3e 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -33,7 +34,7 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" -#include "iceberg/parquet/parquet_metrics.h" +#include "iceberg/parquet/parquet_metrics_internal.h" #include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" @@ -94,6 +95,11 @@ class ParquetWriter::Impl { auto properties_builder = ::parquet::WriterProperties::Builder(); properties_builder.compression(compression); + auto max_row_group_rows = + options.properties.Get(WriterProperties::kParquetMaxRowGroupRows); + ICEBERG_PRECHECK(max_row_group_rows > 0, + "Parquet max row group rows must be greater than 0"); + properties_builder.max_row_group_length(max_row_group_rows); if (compression_level.has_value()) { properties_builder.compression_level(compression_level.value()); } @@ -142,12 +148,11 @@ class ParquetWriter::Impl { } ICEBERG_ARROW_RETURN_NOT_OK(writer_->Close()); - auto& metadata = writer_->metadata(); - split_offsets_.reserve(metadata->num_row_groups()); - for (int i = 0; i < metadata->num_row_groups(); ++i) { - split_offsets_.push_back(metadata->RowGroup(i)->file_offset()); - } metadata_ = writer_->metadata(); + split_offsets_.reserve(metadata_->num_row_groups()); + for (int i = 0; i < metadata_->num_row_groups(); ++i) { + split_offsets_.push_back(metadata_->RowGroup(i)->file_offset()); + } writer_.reset(); ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell()); @@ -171,15 +176,12 @@ class ParquetWriter::Impl { std::vector split_offsets() const { return split_offsets_; } Result metrics() { - if (writer_) { - return Invalid("Cannot return metrics for unclosed writer"); - } - if (!metadata_) { - return Metrics(); - } + ICEBERG_PRECHECK(writer_ == nullptr, "Cannot return metrics for unclosed writer"); + ICEBERG_PRECHECK(metadata_ != nullptr, + "Cannot return metrics because Parquet metadata is not available"); // TODO(WZhuo): collect write-side FieldMetrics to support NaN value counts. return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_, *metrics_config_, - *metadata_, {}); + *metadata_); } private: diff --git a/src/iceberg/test/metrics_test_base.cc b/src/iceberg/test/metrics_test_base.cc index cc5f7cd60..7913a7207 100644 --- a/src/iceberg/test/metrics_test_base.cc +++ b/src/iceberg/test/metrics_test_base.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" @@ -80,6 +81,17 @@ void MetricsTestBase::AssertCounts(int field_id, } } +void MetricsTestBase::AssertColumnSizeFields(std::vector expected_field_ids, + const Metrics& metrics) { + std::vector actual_field_ids; + actual_field_ids.reserve(metrics.column_sizes.size()); + for (const auto& [field_id, size] : metrics.column_sizes) { + EXPECT_GT(size, 0) << "Field " << field_id << " should have a positive size"; + actual_field_ids.push_back(field_id); + } + EXPECT_THAT(actual_field_ids, testing::UnorderedElementsAreArray(expected_field_ids)); +} + template void MetricsTestBase::AssertBounds(int field_id, std::shared_ptr type, std::optional expected_lower, @@ -91,6 +103,8 @@ void MetricsTestBase::AssertBounds(int field_id, std::shared_ptr const auto& literal = metrics.lower_bounds.at(field_id); ASSERT_FALSE(literal.IsNull()) << "Field " << field_id << " lower bound literal should not be null"; + EXPECT_EQ(*literal.type(), *type) + << "Field " << field_id << " lower bound literal type mismatch"; EXPECT_EQ(std::get(literal.value()), expected_lower.value()) << "Field " << field_id << " lower bound mismatch"; } else { @@ -103,6 +117,8 @@ void MetricsTestBase::AssertBounds(int field_id, std::shared_ptr const auto& literal = metrics.upper_bounds.at(field_id); ASSERT_FALSE(literal.IsNull()) << "Field " << field_id << " upper bound literal should not be null"; + EXPECT_EQ(*literal.type(), *type) + << "Field " << field_id << " upper bound literal type mismatch"; EXPECT_EQ(std::get(literal.value()), expected_upper.value()) << "Field " << field_id << " upper bound mismatch"; } else { @@ -208,12 +224,12 @@ void MetricsTestBase::MetricsForRepeatedValues() { ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 2); + AssertColumnSizeFields({1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}, metrics); AssertCounts(1, 2, 0, metrics); AssertCounts(2, 2, 0, metrics); AssertCounts(3, 2, 1, metrics); - // TODO(WZhuo) Assert NaN metrics - AssertCounts(4, 2, 0, metrics); // floatCol has 2 NaN values + AssertCounts(4, 2, 0, metrics); AssertCounts(5, 2, 0, metrics); AssertCounts(6, 2, 1, metrics); AssertCounts(7, 2, 0, metrics); @@ -317,17 +333,13 @@ void MetricsTestBase::MetricsForDecimals() { EXPECT_EQ(*metrics.row_count, 1); AssertCounts(1, 1, 0, metrics); - // For decimals, bounds exist but we just check they're present - EXPECT_TRUE(metrics.lower_bounds.contains(1)); - EXPECT_TRUE(metrics.upper_bounds.contains(1)); + AssertBounds(1, decimal(4, 2), Decimal(255), Decimal(255), metrics); AssertCounts(2, 1, 0, metrics); - EXPECT_TRUE(metrics.lower_bounds.contains(2)); - EXPECT_TRUE(metrics.upper_bounds.contains(2)); + AssertBounds(2, decimal(14, 2), Decimal(475), Decimal(475), metrics); AssertCounts(3, 1, 0, metrics); - EXPECT_TRUE(metrics.lower_bounds.contains(3)); - EXPECT_TRUE(metrics.upper_bounds.contains(3)); + AssertBounds(3, decimal(22, 2), Decimal(580), Decimal(580), metrics); } void MetricsTestBase::MetricsForNestedStructFields() { @@ -338,6 +350,7 @@ void MetricsTestBase::MetricsForNestedStructFields() { ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 1); + AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics); AssertCounts(1, 1, 0, metrics); AssertBounds(1, int32(), std::numeric_limits::min(), @@ -353,7 +366,6 @@ void MetricsTestBase::MetricsForNestedStructFields() { AssertBounds>(6, binary(), std::vector{'A'}, std::vector{'A'}, metrics); - // TODO(WZhuo) Assert NaN metrics AssertCounts(7, 1L, 0L, metrics); AssertBounds(7, float64(), std::nullopt, std::nullopt, metrics); } @@ -373,6 +385,7 @@ void MetricsTestBase::MetricsModeForNestedStructFields() { ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 1); + AssertColumnSizeFields({3}, metrics); // Only field 3 (nestedStructCol.longCol) should have bounds EXPECT_EQ(metrics.lower_bounds.size(), 1); @@ -460,6 +473,7 @@ void MetricsTestBase::MetricsForListAndMapElements() { ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 1); + AssertColumnSizeFields({}, metrics); // For list and map elements, metrics should not be collected // Field IDs: 1 (leafIntCol), 2 (leafStringCol), 4 (list element), 6 (map key), 7 (map @@ -468,6 +482,7 @@ void MetricsTestBase::MetricsForListAndMapElements() { AssertCounts(2, std::nullopt, std::nullopt, metrics); AssertCounts(4, std::nullopt, std::nullopt, metrics); AssertCounts(6, std::nullopt, std::nullopt, metrics); + AssertCounts(7, std::nullopt, std::nullopt, metrics); AssertBounds(1, int32(), std::nullopt, std::nullopt, metrics); AssertBounds(2, string(), std::nullopt, std::nullopt, metrics); @@ -526,7 +541,6 @@ void MetricsTestBase::MetricsForNaNColumns() { ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 2); - // TODO(WZhuo) Assert NaN metrics AssertCounts(1, 2, 0, metrics); AssertCounts(2, 2, 0, metrics); @@ -565,7 +579,6 @@ void MetricsTestBase::ColumnBoundsWithNaNValueAtFront() { ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 3); - // TODO(WZhuo) Assert NaN metrics AssertCounts(1, 3, 0, metrics); AssertCounts(2, 3, 0, metrics); @@ -664,6 +677,8 @@ void MetricsTestBase::MetricsForTopLevelWithMultipleRowGroup() { if (SupportsSmallRowGroups()) { ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount()); EXPECT_EQ(split_count, 3); + } else { + FAIL() << "This test must force multiple row groups"; } ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; @@ -694,6 +709,8 @@ void MetricsTestBase::MetricsForNestedStructFieldsWithMultipleRowGroup() { if (SupportsSmallRowGroups()) { ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount()); EXPECT_EQ(split_count, 3); + } else { + FAIL() << "This test must force multiple row groups"; } ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; EXPECT_EQ(*metrics.row_count, 201); @@ -732,7 +749,7 @@ void MetricsTestBase::NoneMetricsMode() { EXPECT_EQ(*metrics.row_count, 1); // In None mode, column_sizes should be empty - EXPECT_TRUE(metrics.column_sizes.empty()); + AssertColumnSizeFields({}, metrics); // All counts should be null AssertCounts(1, std::nullopt, std::nullopt, metrics); @@ -761,7 +778,7 @@ void MetricsTestBase::CountsMetricsMode() { EXPECT_EQ(*metrics.row_count, 1); // In Counts mode, column_sizes should not be empty - EXPECT_FALSE(metrics.column_sizes.empty()); + AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics); // Counts should be present but bounds should be null AssertCounts(1, 1, 0, metrics); @@ -790,7 +807,7 @@ void MetricsTestBase::FullMetricsMode() { EXPECT_EQ(*metrics.row_count, 1); // In Full mode, column_sizes should not be empty - EXPECT_FALSE(metrics.column_sizes.empty()); + AssertColumnSizeFields({1, 3, 5, 6, 7}, metrics); // Both counts and bounds should be present AssertCounts(1, 1, 0, metrics); diff --git a/src/iceberg/test/metrics_test_base.h b/src/iceberg/test/metrics_test_base.h index 1ad9c8823..07b3b62f3 100644 --- a/src/iceberg/test/metrics_test_base.h +++ b/src/iceberg/test/metrics_test_base.h @@ -22,6 +22,7 @@ #include #include #include +#include #include @@ -67,6 +68,9 @@ class MetricsTestBase { std::optional expected_null_count, std::optional expected_nan_count, const Metrics& metrics); + void AssertColumnSizeFields(std::vector expected_field_ids, + const Metrics& metrics); + template void AssertBounds(int field_id, std::shared_ptr type, std::optional expected_lower, std::optional expected_upper, diff --git a/src/iceberg/test/parquet_metrics_test.cc b/src/iceberg/test/parquet_metrics_test.cc index 93c024b01..8286efe08 100644 --- a/src/iceberg/test/parquet_metrics_test.cc +++ b/src/iceberg/test/parquet_metrics_test.cc @@ -17,6 +17,7 @@ * under the License. */ +#include #include #include #include @@ -27,7 +28,9 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" #include "iceberg/file_writer.h" +#include "iceberg/parquet/parquet_metrics_internal.h" #include "iceberg/parquet/parquet_register.h" +#include "iceberg/test/matchers.h" #include "iceberg/test/metrics_test_base.h" #include "iceberg/util/checked_cast.h" @@ -40,8 +43,10 @@ class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test { void SetUp() override { MetricsTestBase::SetUp(); temp_parquet_file_ = "parquet_metrics_test.parquet"; - writer_properties_ = WriterProperties::FromMap( - {{WriterProperties::kParquetCompression.key(), "uncompressed"}}); + writer_properties_ = WriterProperties::FromMap({ + {WriterProperties::kParquetCompression.key(), "uncompressed"}, + {WriterProperties::kParquetMaxRowGroupRows.key(), "100"}, + }); } Result GetMetrics(std::shared_ptr schema, @@ -76,7 +81,31 @@ class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test { return metadata->num_row_groups(); } - bool SupportsSmallRowGroups() const override { return false; } + Result GetMetricsWithFieldMetrics( + std::shared_ptr schema, std::shared_ptr config, + std::shared_ptr<::arrow::Array> records, + const std::unordered_map& field_metrics) { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, + {.path = temp_parquet_file_, + .schema = schema, + .io = file_io_, + .metadata = {}, + .metrics_config = config, + .properties = writer_properties_})); + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*records, &arr)); + ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + + auto io = internal::checked_cast(*file_io_); + auto infile = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie(); + auto metadata = ::parquet::ReadMetaData(infile); + return parquet::ParquetMetrics::GetMetrics(*schema, *metadata->schema(), *config, + *metadata, field_metrics); + } + + bool SupportsSmallRowGroups() const override { return true; } private: std::string temp_parquet_file_; @@ -85,4 +114,66 @@ class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test { DEFINE_METRICS_TESTS(ParquetMetricsTest); +TEST_F(ParquetMetricsTest, FieldMetricsOverrideFooterStats) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "floatCol", float32()), + SchemaField::MakeOptional(2, "strCol", string()), + }); + auto arrow_schema = ::arrow::schema({ + ::arrow::field("floatCol", ::arrow::float32(), true), + ::arrow::field("strCol", ::arrow::utf8(), true), + }); + auto records = CreateRecordArrays(arrow_schema, R"([ + {"floatCol": 1.0, "strCol": "footer-value"} + ])"); + std::unordered_map properties = { + {"write.metadata.metrics.default", "truncate(3)"}}; + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + + std::unordered_map field_metrics{ + {1, FieldMetrics{.field_id = 1, + .value_count = 10, + .null_value_count = 2, + .lower_bound = Literal::Float(5.0F), + .upper_bound = Literal::Float(9.0F)}}, + {2, FieldMetrics{.field_id = 2, + .value_count = 10, + .null_value_count = 1, + .lower_bound = Literal::String("abcdef"), + .upper_bound = Literal::String("abcxyz")}}, + }; + + ICEBERG_UNWRAP_OR_FAIL( + auto metrics, GetMetricsWithFieldMetrics(schema, config, records, field_metrics)); + + AssertCounts(1, 10, 2, metrics); + AssertBounds(1, float32(), 5.0F, 9.0F, metrics); + AssertCounts(2, 10, 1, metrics); + AssertBounds(2, string(), std::string("abc"), std::string("abd"), metrics); +} + +TEST_F(ParquetMetricsTest, UnrepresentableTruncatedUpperBoundIsOmitted) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "binCol", binary()), + }); + auto arrow_schema = ::arrow::schema({ + ::arrow::field("binCol", ::arrow::binary(), false), + }); + + ::arrow::BinaryBuilder builder; + std::vector data = {0xFF, 0xFF, 0x01}; + ASSERT_TRUE(builder.Append(data.data(), data.size()).ok()); + auto array = builder.Finish().ValueOrDie(); + auto records = ::arrow::StructArray::Make({array}, arrow_schema->fields()).ValueOrDie(); + std::unordered_map properties = { + {"write.metadata.metrics.default", "truncate(2)"}}; + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + AssertCounts(1, 1, 0, metrics); + AssertBounds>(1, binary(), std::vector({0xFF, 0xFF}), + std::nullopt, metrics); +} + } // namespace iceberg::test diff --git a/src/iceberg/util/truncate_util.cc b/src/iceberg/util/truncate_util.cc index 8a1d07243..1778000f9 100644 --- a/src/iceberg/util/truncate_util.cc +++ b/src/iceberg/util/truncate_util.cc @@ -179,7 +179,7 @@ Result> TruncateLiteralMaxImpl( if (!truncated.has_value()) { return std::nullopt; } - return std::optional(Literal::String(std::move(truncated.value()))); + return Literal::String(std::move(*truncated)); } template <> @@ -187,7 +187,7 @@ Result> TruncateLiteralMaxImpl( const Literal& literal, int32_t width) { const auto& data = std::get>(literal.value()); if (static_cast(data.size()) <= width) { - return std::optional(literal); + return literal; } std::vector truncated(data.begin(), data.begin() + width); @@ -195,7 +195,7 @@ Result> TruncateLiteralMaxImpl( if (*it < 0xFF) { ++(*it); truncated.resize(truncated.size() - std::distance(truncated.rbegin(), it)); - return std::optional(Literal::Binary(std::move(truncated))); + return Literal::Binary(std::move(truncated)); } } return std::nullopt; @@ -207,7 +207,7 @@ Result> TruncateUtils::TruncateUTF8Max( const std::string& source, size_t L) { std::string truncated = TruncateUTF8(source, L); if (truncated == source) { - return std::optional(std::move(truncated)); + return truncated; } // Try incrementing code points from the end @@ -236,7 +236,7 @@ Result> TruncateUtils::TruncateUTF8Max( if (next_code_point <= kUtf8MaxCodePoint) { truncated.resize(cp_start); AppendUtf8CodePoint(next_code_point, truncated); - return std::optional(std::move(truncated)); + return truncated; } } last_cp_start = cp_start; @@ -282,7 +282,7 @@ Result> TruncateUtils::TruncateLiteralMax(const Literal& int32_t width) { if (literal.IsNull()) [[unlikely]] { // Return null as is - return std::optional(literal); + return literal; } if (literal.IsAboveMax() || literal.IsBelowMin()) [[unlikely]] { @@ -316,7 +316,7 @@ Result> TruncateUtils::TruncateUpperBound( case TypeId::kBinary: return TruncateLiteralMax(value, length); default: - return std::optional(value); + return value; } }