From 63f7e3882f531ba2fe317df3cc33e62375c5669b Mon Sep 17 00:00:00 2001 From: Zhuo Wang Date: Tue, 24 Mar 2026 19:20:59 +0800 Subject: [PATCH] feat: metrics for parquet writer --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/file_writer.h | 3 + src/iceberg/metrics.h | 29 + src/iceberg/metrics_config.cc | 22 + src/iceberg/metrics_config.h | 11 + src/iceberg/parquet/parquet_metrics.cc | 370 ++++++++ src/iceberg/parquet/parquet_metrics.h | 63 ++ src/iceberg/parquet/parquet_writer.cc | 39 +- src/iceberg/test/CMakeLists.txt | 2 + src/iceberg/test/metrics_test_base.cc | 1020 ++++++++++++++++++++++ src/iceberg/test/metrics_test_base.h | 138 +++ src/iceberg/test/parquet_metrics_test.cc | 87 ++ src/iceberg/util/truncate_util.cc | 23 + src/iceberg/util/truncate_util.h | 24 + 14 files changed, 1822 insertions(+), 10 deletions(-) create mode 100644 src/iceberg/parquet/parquet_metrics.cc create mode 100644 src/iceberg/parquet/parquet_metrics.h create mode 100644 src/iceberg/test/metrics_test_base.cc create mode 100644 src/iceberg/test/metrics_test_base.h create mode 100644 src/iceberg/test/parquet_metrics_test.cc diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index c4e193b89..c857011cf 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -233,6 +233,7 @@ if(ICEBERG_BUILD_BUNDLE) avro/avro_schema_util.cc avro/avro_stream_internal.cc parquet/parquet_data_util.cc + parquet/parquet_metrics.cc parquet/parquet_reader.cc parquet/parquet_register.cc parquet/parquet_schema_util.cc diff --git a/src/iceberg/file_writer.h b/src/iceberg/file_writer.h index a49b5228e..3c890453b 100644 --- a/src/iceberg/file_writer.h +++ b/src/iceberg/file_writer.h @@ -30,6 +30,7 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/file_format.h" #include "iceberg/metrics.h" +#include "iceberg/metrics_config.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" #include "iceberg/util/config.h" @@ -77,6 +78,8 @@ struct ICEBERG_EXPORT WriterOptions { std::shared_ptr io; /// \brief Metadata to write to the file. std::unordered_map metadata; + /// \brief Metrics configuration. + std::shared_ptr metrics_config = MetricsConfig::Default(); /// \brief Format-specific or implementation-specific properties. WriterProperties properties; }; diff --git a/src/iceberg/metrics.h b/src/iceberg/metrics.h index b476a4759..083cd0410 100644 --- a/src/iceberg/metrics.h +++ b/src/iceberg/metrics.h @@ -30,6 +30,35 @@ namespace iceberg { +/// \brief Field-level metrics for a single column. +/// +/// This structure captures value counts, null counts, NaN counts, and optional +/// lower/upper bounds for a specific field identified by its field_id. +struct ICEBERG_EXPORT FieldMetrics { + /// \brief The field ID this metrics belongs to. + int32_t field_id; + + /// \brief The total number of values (including nulls) for this field. + /// A negative value indicates the count is unknown. + int64_t value_count = -1; + + /// \brief The number of null values for this field. + /// A negative value indicates the count is unknown. + int64_t null_value_count = -1; + + /// \brief The number of NaN values for this field. + /// A negative value indicates the count is unknown. + int64_t nan_value_count = -1; + + /// \brief The lower bound value as a Literal. + /// Empty if no lower bound is available. + std::optional lower_bound = std::nullopt; + + /// \brief The upper bound value as a Literal. + /// Empty if no upper bound is available. + std::optional upper_bound = std::nullopt; +}; + /// \brief Iceberg file format metrics struct ICEBERG_EXPORT Metrics { std::optional row_count; diff --git a/src/iceberg/metrics_config.cc b/src/iceberg/metrics_config.cc index e378640e0..ea20d47e1 100644 --- a/src/iceberg/metrics_config.cc +++ b/src/iceberg/metrics_config.cc @@ -19,6 +19,7 @@ #include "iceberg/metrics_config.h" +#include #include #include @@ -100,6 +101,19 @@ Result MetricsMode::FromString(std::string_view mode) { return InvalidArgument("Invalid metrics mode: {}", mode); } +int32_t MetricsMode::TruncateLength() const { + switch (kind) { + case Kind::kNone: + case Kind::kCounts: + return 0; + case Kind::kTruncate: + return std::get(length); + case Kind::kFull: + return std::numeric_limits::max(); + } + return 0; +} + MetricsConfig::MetricsConfig(ColumnModeMap column_modes, MetricsMode default_mode) : column_modes_(std::move(column_modes)), default_mode_(default_mode) {} @@ -116,6 +130,14 @@ Result> MetricsConfig::Make(const Table& table) { *sort_order.value_or(SortOrder::Unsorted())); } +Result> MetricsConfig::Make( + std::unordered_map properties) { + // Create a minimal TableProperties wrapper for the properties + TableProperties props = TableProperties::FromMap(std::move(properties)); + + return MakeInternal(props, Schema({}), *SortOrder::Unsorted()); +} + Result> MetricsConfig::MakeInternal( const TableProperties& props, const Schema& schema, const SortOrder& order) { ColumnModeMap column_modes; diff --git a/src/iceberg/metrics_config.h b/src/iceberg/metrics_config.h index 7a49e906f..a5e51ee6c 100644 --- a/src/iceberg/metrics_config.h +++ b/src/iceberg/metrics_config.h @@ -52,6 +52,11 @@ struct ICEBERG_EXPORT MetricsMode { Kind kind; std::variant length; + + /// \brief Get the truncate length from this MetricsMode. + /// \return 0 for None/Counts modes, the truncate length for Truncate mode, + /// or INT_MAX for Full mode. + int32_t TruncateLength() const; }; /// \brief Configuration for collecting column metrics for an Iceberg table. @@ -63,6 +68,12 @@ class ICEBERG_EXPORT MetricsConfig { /// \brief Creates a metrics config from a table. static Result> Make(const Table& table); + /// \brief Creates a metrics config from properties (for testing) + /// \param properties Map of property key-value pairs + /// \return A shared pointer to the created MetricsConfig + static Result> Make( + std::unordered_map properties); + /// \brief Get `limit` num of primitive field ids from schema static Result> LimitFieldIds(const Schema& schema, int32_t limit); diff --git a/src/iceberg/parquet/parquet_metrics.cc b/src/iceberg/parquet/parquet_metrics.cc new file mode 100644 index 000000000..09b727a1a --- /dev/null +++ b/src/iceberg/parquet/parquet_metrics.cc @@ -0,0 +1,370 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/parquet/parquet_metrics.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "iceberg/expression/literal.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" +#include "iceberg/type.h" +#include "iceberg/util/checked_cast.h" +#include "iceberg/util/conversions.h" +#include "iceberg/util/truncate_util.h" +#include "iceberg/util/visit_type.h" + +namespace iceberg::parquet { + +namespace { + +/// \brief Get the Iceberg field ID from a Parquet column descriptor. +/// \return The field ID, or nullopt if no field ID is set. +std::optional GetFieldId(const ::parquet::ColumnDescriptor& column) { + const auto& node = column.schema_node(); + if (node == nullptr || !node->is_primitive()) { + return std::nullopt; + } + if (node->field_id() < 0) { + return std::nullopt; + } + return node->field_id(); +} + +/// \brief Find the column index for a field in the Parquet schema. +std::optional FindColumnIndex(const ::parquet::SchemaDescriptor& parquet_schema, + int32_t field_id) { + auto columns = std::views::iota(0, parquet_schema.num_columns()); + auto it = std::ranges::find_if(columns, [&](int i) { + auto column_field_id = GetFieldId(*parquet_schema.Column(i)); + return column_field_id.has_value() && column_field_id.value() == field_id; + }); + return it != columns.end() ? std::optional(*it) : std::nullopt; +} + +/// \brief Collect counts (value count and null count) from footer statistics. +/// \param field_id The Iceberg field ID. +/// \param metadata The Parquet file metadata. +/// \param column_idx The column index in the Parquet schema. +/// \return A pair of (value_count, null_count), or nullopt if stats are not available. +std::optional CollectCounts(int32_t field_id, + const ::parquet::FileMetaData& metadata, + int32_t column_idx) { + int64_t value_count = 0; + int64_t null_count = 0; + + for (int rg = 0; rg < metadata.num_row_groups(); ++rg) { + auto row_group = metadata.RowGroup(rg); + auto column_chunk = row_group->ColumnChunk(column_idx); + auto stats = column_chunk->statistics(); + if (stats == nullptr || !stats->HasNullCount()) { + return std::nullopt; + } + + null_count += stats->null_count(); + value_count += column_chunk->num_values(); + } + + return FieldMetrics{ + .field_id = field_id, .value_count = value_count, .null_value_count = null_count}; +} + +/// \brief Collect bounds (lower and upper) from footer statistics. +/// \param field_id The Iceberg field ID. +/// \param iceberg_type The Iceberg primitive type for deserializing values. +/// \param metadata The Parquet file metadata. +/// \param column_idx The column index in the Parquet schema. +/// \param truncate_length The length to truncate strings/binary values. +/// \return FieldMetrics with counts and bounds, or nullopt if stats are not available. +Result> CollectBounds( + int32_t field_id, std::shared_ptr iceberg_type, + const ::parquet::FileMetaData& metadata, int32_t column_idx, + int32_t truncate_length) { + int64_t null_count = 0; + int64_t value_count = 0; + std::optional lower_bound; + std::optional upper_bound; + + for (int32_t rg = 0; rg < metadata.num_row_groups(); ++rg) { + auto row_group = metadata.RowGroup(rg); + auto column_chunk = row_group->ColumnChunk(column_idx); + auto stats = column_chunk->statistics(); + if (stats == nullptr || !stats->HasNullCount()) { + return std::nullopt; + } + + null_count += stats->null_count(); + value_count += column_chunk->num_values(); + + if (stats->HasMinMax()) { + auto min_bytes = stats->EncodeMin(); + auto min_span = std::span( + reinterpret_cast(min_bytes.data()), min_bytes.size()); + ICEBERG_ASSIGN_OR_RAISE(auto min_value, + Conversions::FromBytes(iceberg_type, min_span)); + if (!lower_bound.has_value() || min_value < lower_bound.value()) { + lower_bound = std::move(min_value); + } + + auto max_bytes = stats->EncodeMax(); + auto max_span = std::span( + reinterpret_cast(max_bytes.data()), max_bytes.size()); + ICEBERG_ASSIGN_OR_RAISE(auto max_value, + Conversions::FromBytes(iceberg_type, max_span)); + if (!upper_bound.has_value() || max_value > upper_bound.value()) { + upper_bound = std::move(max_value); + } + } + } + + if (!lower_bound.has_value() || !upper_bound.has_value() || lower_bound->IsNaN() || + upper_bound->IsNaN()) { + return FieldMetrics{ + .field_id = field_id, + .value_count = value_count, + .null_value_count = null_count, + }; + } + + ICEBERG_ASSIGN_OR_RAISE(auto truncated_lower, + TruncateUtils::TruncateLowerBound( + *iceberg_type, lower_bound.value(), truncate_length)); + ICEBERG_ASSIGN_OR_RAISE(auto truncated_upper, + TruncateUtils::TruncateUpperBound( + *iceberg_type, upper_bound.value(), truncate_length)); + + return FieldMetrics{ + .field_id = field_id, + .value_count = value_count, + .null_value_count = null_count, + .lower_bound = std::move(truncated_lower), + .upper_bound = std::move(truncated_upper), + }; +} + +/// \brief Process pre-computed field metrics, applying truncation if needed. +/// \param field_id The field ID to look up. +/// \param field_metrics The map of pre-computed field metrics. +/// \param primitive_type The primitive type for truncation. +/// \param truncate_length The truncation length (0 means no bounds). +/// \return Processed FieldMetrics with truncated bounds if applicable. +Result> MetricsFromFieldMetrics( + int32_t field_id, const std::unordered_map& field_metrics, + std::shared_ptr primitive_type, int32_t truncate_length) { + auto it = field_metrics.find(field_id); + if (it == field_metrics.end()) { + return std::nullopt; + } + + const auto& fm = it->second; + FieldMetrics result{.field_id = fm.field_id, + .value_count = fm.value_count, + .null_value_count = fm.null_value_count, + .nan_value_count = fm.nan_value_count}; + + if (truncate_length > 0) { + if (fm.lower_bound.has_value()) { + ICEBERG_ASSIGN_OR_RAISE( + auto lower, TruncateUtils::TruncateLowerBound( + *primitive_type, fm.lower_bound.value(), truncate_length)); + result.lower_bound = std::move(lower); + } + if (fm.upper_bound.has_value()) { + ICEBERG_ASSIGN_OR_RAISE( + auto upper, TruncateUtils::TruncateUpperBound( + *primitive_type, fm.upper_bound.value(), truncate_length)); + result.upper_bound = std::move(upper); + } + } + + return result; +} + +/// \brief Collect metrics for a single primitive field from footer statistics. +Result> MetricsFromFooter( + int32_t field_id, std::shared_ptr iceberg_type, + const ::parquet::SchemaDescriptor& parquet_schema, + const ::parquet::FileMetaData& metadata, int32_t truncate_length) { + auto column_idx = FindColumnIndex(parquet_schema, field_id); + if (!column_idx.has_value()) { + return std::nullopt; + } + + auto column_desc = parquet_schema.Column(column_idx.value()); + if (column_desc->physical_type() == ::parquet::Type::INT96) { + return std::nullopt; + } + + if (truncate_length <= 0) { + return CollectCounts(field_id, metadata, column_idx.value()); + } + + return CollectBounds(field_id, iceberg_type, metadata, column_idx.value(), + truncate_length); +} + +/// \brief Visitor for collecting metrics from all primitive fields in a schema. +class CollectMetricsVisitor { + public: + CollectMetricsVisitor(const ::parquet::SchemaDescriptor& parquet_schema, + const MetricsConfig& metrics_config, + const ::parquet::FileMetaData& metadata, + const std::unordered_map& field_metrics, + Metrics& metrics) + : parquet_schema_(parquet_schema), + metrics_config_(metrics_config), + metadata_(metadata), + field_metrics_(field_metrics), + metrics_(metrics) {} + + Status VisitStruct(const StructType& type, const std::string& prefix) { + for (const auto& field : type.fields()) { + std::string full_name = prefix.empty() ? std::string(field.name()) + : prefix + "." + std::string(field.name()); + ICEBERG_RETURN_UNEXPECTED(VisitField(field, full_name)); + } + return {}; + } + + Status VisitList(const ListType& /*type*/, const std::string& /*prefix*/) { return {}; } + + Status VisitMap(const MapType& /*type*/, const std::string& /*prefix*/) { return {}; } + + Status VisitPrimitive(const PrimitiveType& /*type*/, const std::string& /*prefix*/) { + return {}; + } + + private: + Status VisitField(const SchemaField& field, const std::string& full_name) { + if (field.type()->is_primitive()) { + return ProcessPrimitiveField(field, full_name); + } else if (field.type()->is_nested()) { + return VisitTypeCategory(*field.type(), this, full_name); + } + return {}; + } + + Status ProcessPrimitiveField(const SchemaField& field, const std::string& full_name) { + int32_t field_id = field.field_id(); + MetricsMode mode = metrics_config_.ColumnMode(full_name); + if (mode.kind == MetricsMode::Kind::kNone) { + return {}; + } + + int32_t truncate_length = mode.TruncateLength(); + const auto& primitive_type = + internal::checked_pointer_cast(field.type()); + + ICEBERG_ASSIGN_OR_RAISE(auto field_metrics, + MetricsFromFieldMetrics(field_id, field_metrics_, + primitive_type, truncate_length)); + if (field_metrics.has_value()) { + ApplyFieldMetrics(field_id, std::move(field_metrics.value())); + return {}; + } + + ICEBERG_ASSIGN_OR_RAISE(auto footer_metrics, + MetricsFromFooter(field_id, primitive_type, parquet_schema_, + metadata_, truncate_length)); + if (footer_metrics.has_value()) { + ApplyFieldMetrics(field_id, std::move(footer_metrics.value())); + } + return {}; + } + + void ApplyFieldMetrics(int32_t field_id, FieldMetrics&& fm) { + if (fm.value_count >= 0) { + metrics_.value_counts[field_id] = fm.value_count; + } + if (fm.null_value_count >= 0) { + metrics_.null_value_counts[field_id] = fm.null_value_count; + } + if (fm.nan_value_count >= 0) { + metrics_.nan_value_counts[field_id] = fm.nan_value_count; + } + if (fm.lower_bound.has_value()) { + metrics_.lower_bounds.emplace(field_id, std::move(fm.lower_bound.value())); + } + if (fm.upper_bound.has_value()) { + metrics_.upper_bounds.emplace(field_id, std::move(fm.upper_bound.value())); + } + } + + const ::parquet::SchemaDescriptor& parquet_schema_; + const MetricsConfig& metrics_config_; + const ::parquet::FileMetaData& metadata_; + const std::unordered_map& field_metrics_; + Metrics& metrics_; +}; + +} // namespace + +Result ParquetMetrics::GetMetrics( + const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema, + const MetricsConfig& metrics_config, const ::parquet::FileMetaData& metadata, + const std::unordered_map& field_metrics) { + Metrics metrics; + + // Collect row count and column sizes + int64_t row_count = 0; + for (int rg = 0; rg < metadata.num_row_groups(); ++rg) { + auto row_group = metadata.RowGroup(rg); + row_count += row_group->num_rows(); + for (int col = 0; col < row_group->num_columns(); ++col) { + auto column_chunk = row_group->ColumnChunk(col); + auto field_id_opt = GetFieldId(*parquet_schema.Column(col)); + if (!field_id_opt.has_value()) { + continue; + } + int32_t field_id = field_id_opt.value(); + + ICEBERG_ASSIGN_OR_RAISE(auto field_name, schema.FindColumnNameById(field_id)); + if (!field_name.has_value()) { + continue; + } + + MetricsMode mode = metrics_config.ColumnMode(field_name.value()); + if (mode.kind != MetricsMode::Kind::kNone) { + metrics.column_sizes[field_id] = + metrics.column_sizes.contains(field_id) + ? metrics.column_sizes[field_id] + column_chunk->total_compressed_size() + : column_chunk->total_compressed_size(); + } + } + } + metrics.row_count = row_count; + + // Collect metrics for all primitive fields + CollectMetricsVisitor visitor(parquet_schema, metrics_config, metadata, field_metrics, + metrics); + ICEBERG_RETURN_UNEXPECTED(visitor.VisitStruct(schema, "")); + + return metrics; +} + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_metrics.h b/src/iceberg/parquet/parquet_metrics.h new file mode 100644 index 000000000..5b3a87a16 --- /dev/null +++ b/src/iceberg/parquet/parquet_metrics.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/parquet/parquet_metrics.h +/// \brief Utilities for extracting metrics from Parquet files. + +#include + +#include + +#include "iceberg/iceberg_bundle_export.h" +#include "iceberg/metrics.h" +#include "iceberg/metrics_config.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" + +namespace iceberg::parquet { + +/// \brief Utility class for computing metrics from Parquet files. +class ICEBERG_BUNDLE_EXPORT ParquetMetrics { + public: + ParquetMetrics() = delete; + + /// \brief Compute file-level metrics from Parquet file metadata. + /// + /// This function extracts metrics including row count, column sizes, value counts, + /// null value counts, and lower/upper bounds from Parquet file metadata. + /// The metrics are computed according to the provided MetricsConfig, which determines + /// which columns to collect metrics for and at what granularity (counts only, truncated + /// bounds, or full bounds). + /// + /// \param schema The Iceberg schema for the table. + /// \param parquet_schema The Parquet schema descriptor. + /// \param metrics_config The configuration specifying how to collect metrics. + /// \param metadata The Parquet file metadata containing row group statistics. + /// \param field_metrics Optional per-field metrics computed during write. + /// If provided, these take precedence over footer statistics. + /// \return Result containing the computed Metrics or an error. + static Result GetMetrics( + const Schema& schema, const ::parquet::SchemaDescriptor& parquet_schema, + const MetricsConfig& metrics_config, const ::parquet::FileMetaData& metadata, + const std::unordered_map& field_metrics = {}); +}; + +} // namespace iceberg::parquet diff --git a/src/iceberg/parquet/parquet_writer.cc b/src/iceberg/parquet/parquet_writer.cc index 7e2d3d151..c0e7b9691 100644 --- a/src/iceberg/parquet/parquet_writer.cc +++ b/src/iceberg/parquet/parquet_writer.cc @@ -31,6 +31,7 @@ #include "iceberg/arrow/arrow_io_internal.h" #include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/parquet/parquet_metrics.h" #include "iceberg/schema_internal.h" #include "iceberg/util/macros.h" @@ -76,6 +77,8 @@ Result> ParseCodecLevel(const WriterProperties& propertie class ParquetWriter::Impl { public: Status Open(const WriterOptions& options) { + schema_ = options.schema; + ICEBERG_ASSIGN_OR_RAISE(auto compression, ParseCompression(options.properties)); ICEBERG_ASSIGN_OR_RAISE(auto compression_level, ParseCodecLevel(options.properties)); @@ -88,15 +91,14 @@ class ParquetWriter::Impl { auto arrow_writer_properties = ::parquet::default_arrow_writer_properties(); ArrowSchema c_schema; - ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*options.schema, &c_schema)); + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema_, &c_schema)); ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema_, ::arrow::ImportSchema(&c_schema)); - std::shared_ptr<::parquet::SchemaDescriptor> schema_descriptor; ICEBERG_ARROW_RETURN_NOT_OK( ::parquet::arrow::ToParquetSchema(arrow_schema_.get(), *writer_properties, - *arrow_writer_properties, &schema_descriptor)); + *arrow_writer_properties, &parquet_schema_)); auto schema_node = std::static_pointer_cast<::parquet::schema::GroupNode>( - schema_descriptor->schema_root()); + parquet_schema_->schema_root()); ICEBERG_ASSIGN_OR_RAISE(output_stream_, OpenOutputStream(options)); auto file_writer = ::parquet::ParquetFileWriter::Open( @@ -106,6 +108,8 @@ class ParquetWriter::Impl { ::parquet::arrow::FileWriter::Make(pool_, std::move(file_writer), arrow_schema_, std::move(arrow_writer_properties), &writer_)); + metrics_config_ = options.metrics_config; + return {}; } @@ -130,6 +134,7 @@ class ParquetWriter::Impl { for (int i = 0; i < metadata->num_row_groups(); ++i) { split_offsets_.push_back(metadata->RowGroup(i)->file_offset()); } + metadata_ = writer_->metadata(); writer_.reset(); ICEBERG_ARROW_ASSIGN_OR_RETURN(total_bytes_, output_stream_->Tell()); @@ -152,15 +157,34 @@ class ParquetWriter::Impl { std::vector split_offsets() const { return split_offsets_; } + Result metrics() { + if (writer_) { + return Invalid("Cannot return metrics for unclosed writer"); + } + if (!metadata_) { + return Metrics(); + } + return ParquetMetrics::GetMetrics(*schema_, *parquet_schema_, *metrics_config_, + *metadata_, {}); + } + private: // TODO(gangwu): make memory pool configurable ::arrow::MemoryPool* pool_ = ::arrow::default_memory_pool(); + // Schema to write from the Iceberg table. + std::shared_ptr schema_; // Schema to write from the Parquet file. std::shared_ptr<::arrow::Schema> arrow_schema_; + // Parquet schema descriptor generated from the Arrow schema. + std::shared_ptr<::parquet::SchemaDescriptor> parquet_schema_; + // Metrics config for collecting metrics during write. + std::shared_ptr metrics_config_; // The output stream to write Parquet file. std::shared_ptr<::arrow::io::OutputStream> output_stream_; // Parquet file writer to write ArrowArray. std::unique_ptr<::parquet::arrow::FileWriter> writer_; + // Store the metadata if writer has been closed. + std::shared_ptr<::parquet::FileMetaData> metadata_; // Total length of the written Parquet file. int64_t total_bytes_{0}; // Row group start offsets in the Parquet file. @@ -178,12 +202,7 @@ Status ParquetWriter::Write(ArrowArray* array) { return impl_->Write(array); } Status ParquetWriter::Close() { return impl_->Close(); } -Result ParquetWriter::metrics() { - if (!impl_->Closed()) { - return Invalid("ParquetWriter is not closed"); - } - return {}; -} +Result ParquetWriter::metrics() { return impl_->metrics(); } Result ParquetWriter::length() { return impl_->length(); } diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1d80b29a5..2e3122878 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -188,7 +188,9 @@ if(ICEBERG_BUILD_BUNDLE) add_iceberg_test(parquet_test USE_BUNDLE SOURCES + metrics_test_base.cc parquet_data_test.cc + parquet_metrics_test.cc parquet_schema_test.cc parquet_test.cc) diff --git a/src/iceberg/test/metrics_test_base.cc b/src/iceberg/test/metrics_test_base.cc new file mode 100644 index 000000000..5fc97c069 --- /dev/null +++ b/src/iceberg/test/metrics_test_base.cc @@ -0,0 +1,1020 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/test/metrics_test_base.h" + +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/schema_internal.h" +#include "iceberg/test/matchers.h" +#include "iceberg/type.h" +#include "iceberg/util/decimal.h" + +namespace iceberg::test { + +void MetricsTestBase::SetUp() { + file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + temp_dir_ = "metrics_test"; +} + +void MetricsTestBase::AssertCounts(int field_id, + std::optional expected_value_count, + std::optional expected_null_count, + const Metrics& metrics) { + if (expected_value_count.has_value()) { + ASSERT_TRUE(metrics.value_counts.contains(field_id)) + << "Field " << field_id << " should have value count"; + EXPECT_EQ(metrics.value_counts.at(field_id), expected_value_count.value()) + << "Field " << field_id << " value count mismatch"; + } else { + EXPECT_FALSE(metrics.value_counts.contains(field_id)) + << "Field " << field_id << " should not have value count"; + } + + if (expected_null_count.has_value()) { + ASSERT_TRUE(metrics.null_value_counts.contains(field_id)) + << "Field " << field_id << " should have null count"; + EXPECT_EQ(metrics.null_value_counts.at(field_id), expected_null_count.value()) + << "Field " << field_id << " null count mismatch"; + } else { + EXPECT_FALSE(metrics.null_value_counts.contains(field_id)) + << "Field " << field_id << " should not have null count"; + } +} + +void MetricsTestBase::AssertCounts(int field_id, + std::optional expected_value_count, + std::optional expected_null_count, + std::optional expected_nan_count, + const Metrics& metrics) { + AssertCounts(field_id, expected_value_count, expected_null_count, metrics); + + if (expected_nan_count.has_value()) { + ASSERT_TRUE(metrics.nan_value_counts.contains(field_id)) + << "Field " << field_id << " should have NaN count"; + EXPECT_EQ(metrics.nan_value_counts.at(field_id), expected_nan_count.value()) + << "Field " << field_id << " NaN count mismatch"; + } else { + EXPECT_FALSE(metrics.nan_value_counts.contains(field_id)) + << "Field " << field_id << " should not have NaN count"; + } +} + +template +void MetricsTestBase::AssertBounds(int field_id, std::shared_ptr type, + std::optional expected_lower, + std::optional expected_upper, + const Metrics& metrics) { + if (expected_lower.has_value()) { + ASSERT_TRUE(metrics.lower_bounds.contains(field_id)) + << "Field " << field_id << " should have lower bound"; + const auto& literal = metrics.lower_bounds.at(field_id); + ASSERT_FALSE(literal.IsNull()) + << "Field " << field_id << " lower bound literal should not be null"; + EXPECT_EQ(std::get(literal.value()), expected_lower.value()) + << "Field " << field_id << " lower bound mismatch"; + } else { + EXPECT_FALSE(metrics.lower_bounds.contains(field_id)); + } + + if (expected_upper.has_value()) { + ASSERT_TRUE(metrics.upper_bounds.contains(field_id)) + << "Field " << field_id << " should have upper bound"; + const auto& literal = metrics.upper_bounds.at(field_id); + ASSERT_FALSE(literal.IsNull()) + << "Field " << field_id << " upper bound literal should not be null"; + EXPECT_EQ(std::get(literal.value()), expected_upper.value()) + << "Field " << field_id << " upper bound mismatch"; + } else { + EXPECT_FALSE(metrics.upper_bounds.contains(field_id)); + } +} + +// Explicit template instantiations for common types +template void MetricsTestBase::AssertBounds(int, std::shared_ptr, + std::optional, + std::optional, const Metrics&); +template void MetricsTestBase::AssertBounds(int, std::shared_ptr, + std::optional, + std::optional, + const Metrics&); +template void MetricsTestBase::AssertBounds(int, std::shared_ptr, + std::optional, + std::optional, + const Metrics&); +template void MetricsTestBase::AssertBounds(int, std::shared_ptr, + std::optional, + std::optional, const Metrics&); +template void MetricsTestBase::AssertBounds(int, std::shared_ptr, + std::optional, + std::optional, + const Metrics&); +template void MetricsTestBase::AssertBounds(int, + std::shared_ptr, + std::optional, + std::optional, + const Metrics&); +template void MetricsTestBase::AssertBounds>( + int, std::shared_ptr, std::optional>, + std::optional>, const Metrics&); + +template void MetricsTestBase::AssertBounds(int, std::shared_ptr, + std::optional, + std::optional, + const Metrics&); + +std::shared_ptr<::arrow::Array> MetricsTestBase::CreateRecordArrays( + const std::shared_ptr<::arrow::Schema>& arrow_schema, const std::string& json_data) { + auto struct_type = ::arrow::struct_(arrow_schema->fields()); + return ::arrow::json::ArrayFromJSONString(struct_type, json_data).ValueOrDie(); +} + +std::shared_ptr MetricsTestBase::SimpleSchema() { + return std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "booleanCol", boolean()), + SchemaField::MakeRequired(2, "intCol", int32()), + SchemaField::MakeOptional(3, "longCol", int64()), + SchemaField::MakeRequired(4, "floatCol", float32()), + SchemaField::MakeOptional(5, "doubleCol", float64()), + SchemaField::MakeOptional(6, "decimalCol", decimal(10, 2)), + SchemaField::MakeRequired(7, "stringCol", string()), + SchemaField::MakeOptional(8, "dateCol", date()), + SchemaField::MakeRequired(9, "timeCol", time()), + SchemaField::MakeRequired(10, "timestampColAboveEpoch", timestamp()), + SchemaField::MakeRequired(11, "fixedCol", fixed(4)), + SchemaField::MakeRequired(12, "binaryCol", binary()), + SchemaField::MakeRequired(13, "timestampColBelowEpoch", timestamp()), + }); +} + +std::shared_ptr MetricsTestBase::NestedSchema() { + auto leaf_struct = struct_({ + SchemaField::MakeOptional(5, "leafLongCol", int64()), + SchemaField::MakeOptional(6, "leafBinaryCol", binary()), + }); + + auto nested_struct = struct_({ + SchemaField::MakeRequired(3, "longCol", int64()), + SchemaField::MakeRequired(4, "leafStructCol", leaf_struct), + SchemaField::MakeRequired(7, "doubleCol", float64()), + }); + + return std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "intCol", int32()), + SchemaField::MakeRequired(2, "nestedStructCol", nested_struct), + }); +} + +std::shared_ptr MetricsTestBase::FloatDoubleSchema() { + return std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "floatCol", float32()), + SchemaField::MakeOptional(2, "doubleCol", float64()), + }); +} + +Result> ToArrowSchema(std::shared_ptr schema) { + ArrowSchema c_schema; + ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(*schema, &c_schema)); + std::shared_ptr<::arrow::Schema> arrow_schema; + ICEBERG_ARROW_ASSIGN_OR_RETURN(arrow_schema, ::arrow::ImportSchema(&c_schema)); + return arrow_schema; +} + +void MetricsTestBase::MetricsForRepeatedValues() { + auto schema = SimpleSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema)); + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildSimpleRecords(arrow_schema, 2)); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 2); + + AssertCounts(1, 2, 0, metrics); + AssertCounts(2, 2, 0, metrics); + AssertCounts(3, 2, 1, metrics); + // TODO() Assert NaN metrics + AssertCounts(4, 2, 0, metrics); // floatCol has 2 NaN values + AssertCounts(5, 2, 0, metrics); + AssertCounts(6, 2, 1, metrics); + AssertCounts(7, 2, 0, metrics); + AssertCounts(8, 2, 0, metrics); + AssertCounts(9, 2, 0, metrics); + AssertCounts(10, 2, 0, metrics); + AssertCounts(11, 2, 0, metrics); + AssertCounts(12, 2, 0, metrics); + AssertCounts(13, 2, 0, metrics); +} + +void MetricsTestBase::MetricsForTopLevelFields() { + auto schema = SimpleSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema)); + + auto records = CreateRecordArrays(arrow_schema, R"([ + {"booleanCol": true, "intCol": 3, "longCol": 5, "floatCol": 2.0, "doubleCol": 2.0, + "decimalCol": "3.50", "stringCol": "AAA", "dateCol": 1500, "timeCol": 2000, + "timestampColAboveEpoch": 0, "fixedCol": "abcd", "binaryCol": "S", "timestampColBelowEpoch": -1900300}, + {"booleanCol": false, "intCol": -2147483648, "longCol": null, "floatCol": 1.0, "doubleCol": null, + "decimalCol": null, "stringCol": "ZZZ", "dateCol": null, "timeCol": 3000, + "timestampColAboveEpoch": 900, "fixedCol": "abcd", "binaryCol": "W", "timestampColBelowEpoch": -7000} + ])"); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 2); + + AssertCounts(1, 2, 0, metrics); + AssertBounds(1, boolean(), false, true, metrics); + AssertCounts(2, 2, 0, metrics); + AssertBounds(2, int32(), std::numeric_limits::min(), 3, metrics); + AssertCounts(3, 2, 1, metrics); + AssertBounds(3, int64(), 5, 5, metrics); + AssertCounts(4, 2, 0, metrics); + AssertBounds(4, float32(), 1.0F, 2.0F, metrics); + AssertCounts(5, 2, 1, metrics); + AssertBounds(5, float64(), 2.0, 2.0, metrics); + AssertCounts(6, 2L, 1L, metrics); + AssertBounds(6, std::make_shared(10, 2), Decimal(350), + Decimal(350), metrics); + AssertCounts(7, 2, 0, metrics); + AssertBounds(7, string(), std::string("AAA"), std::string("ZZZ"), metrics); + + AssertCounts(8, 2, 1, metrics); + AssertBounds(8, date(), 1500, 1500, metrics); + + AssertCounts(9, 2, 0, metrics); + AssertBounds(9, time(), 2000, 3000, metrics); + + AssertCounts(10, 2, 0, metrics); + AssertBounds(10, timestamp(), 0, 900, metrics); + + AssertCounts(11, 2, 0, metrics); + std::vector fixed_val = {'a', 'b', 'c', 'd'}; + AssertBounds>(11, fixed(4), fixed_val, fixed_val, metrics); + + AssertCounts(12, 2, 0, metrics); + AssertBounds>(12, binary(), std::vector{'S'}, + std::vector{'W'}, metrics); + + AssertCounts(13, 2, 0, metrics); + AssertBounds(13, timestamp(), -1900300, -7000, metrics); +} + +void MetricsTestBase::MetricsForDecimals() { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "decimalAsInt32", decimal(4, 2)), + SchemaField::MakeRequired(2, "decimalAsInt64", decimal(14, 2)), + SchemaField::MakeRequired(3, "decimalAsFixed", decimal(22, 2)), + }); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("decimalAsInt32", ::arrow::decimal128(4, 2), false), + ::arrow::field("decimalAsInt64", ::arrow::decimal128(14, 2), false), + ::arrow::field("decimalAsFixed", ::arrow::decimal128(22, 2), false), + }); + + // Create decimal values + ::arrow::Decimal128Builder builder1(::arrow::decimal128(4, 2)); + ::arrow::Decimal128Builder builder2(::arrow::decimal128(14, 2)); + ::arrow::Decimal128Builder builder3(::arrow::decimal128(22, 2)); + + // 2.55, 4.75, 5.80 + ASSERT_TRUE(builder1.Append(::arrow::Decimal128("255")).ok()); // 2.55 with scale 2 + ASSERT_TRUE(builder2.Append(::arrow::Decimal128("475")).ok()); // 4.75 with scale 2 + ASSERT_TRUE(builder3.Append(::arrow::Decimal128("580")).ok()); // 5.80 with scale 2 + + auto array1 = builder1.Finish().ValueOrDie(); + auto array2 = builder2.Finish().ValueOrDie(); + auto array3 = builder3.Finish().ValueOrDie(); + + std::vector> field_arrays = {array1, array2, array3}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + AssertCounts(1, 1, 0, metrics); + // For decimals, bounds exist but we just check they're present + EXPECT_TRUE(metrics.lower_bounds.contains(1)); + EXPECT_TRUE(metrics.upper_bounds.contains(1)); + + AssertCounts(2, 1, 0, metrics); + EXPECT_TRUE(metrics.lower_bounds.contains(2)); + EXPECT_TRUE(metrics.upper_bounds.contains(2)); + + AssertCounts(3, 1, 0, metrics); + EXPECT_TRUE(metrics.lower_bounds.contains(3)); + EXPECT_TRUE(metrics.upper_bounds.contains(3)); +} + +void MetricsTestBase::MetricsForNestedStructFields() { + auto schema = NestedSchema(); + + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords()); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + AssertCounts(1, 1, 0, metrics); + AssertBounds(1, int32(), std::numeric_limits::min(), + std::numeric_limits::min(), metrics); + + AssertCounts(3, 1, 0, metrics); + AssertBounds(3, int64(), 100, 100, metrics); + + AssertCounts(5, 1, 0, metrics); + AssertBounds(5, int64(), 20, 20, metrics); + + AssertCounts(6, 1L, 0L, metrics); + AssertBounds>(6, binary(), std::vector{'A'}, + std::vector{'A'}, metrics); + + // TODO() Assert NaN metrics + AssertCounts(7, 1L, 0L, metrics); + AssertBounds(7, float64(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::MetricsModeForNestedStructFields() { + auto schema = NestedSchema(); + + // Create MetricsConfig with custom column modes + // Default mode is None, but nestedStructCol.longCol should be Full + std::unordered_map properties = { + {"write.metadata.metrics.default", "none"}, + {"write.metadata.metrics.column.nestedStructCol.longCol", "full"}}; + + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords()); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // Only field 3 (nestedStructCol.longCol) should have bounds + EXPECT_EQ(metrics.lower_bounds.size(), 1); + EXPECT_EQ(metrics.upper_bounds.size(), 1); + AssertBounds(3, int64(), 100, 100, metrics); +} + +void MetricsTestBase::MetricsForListAndMapElements() { + // Create struct type for map values + auto leaf_struct = struct_({ + SchemaField::MakeRequired(1, "leafIntCol", int32()), + SchemaField::MakeOptional(2, "leafStringCol", string()), + }); + + // Create list and map types using constructors directly + auto list_type = list(SchemaField::MakeRequired(4, "element", int32())); + auto map_type = map(SchemaField::MakeRequired(6, "key", string()), + SchemaField::MakeRequired(7, "value", leaf_struct)); + + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(3, "intListCol", list_type), + SchemaField::MakeOptional(5, "mapCol", map_type), + }); + + // Create Arrow schema + auto arrow_leaf_struct = ::arrow::struct_({ + ::arrow::field("leafIntCol", ::arrow::int32(), false), + ::arrow::field("leafStringCol", ::arrow::utf8(), true), + }); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("intListCol", + ::arrow::list(::arrow::field("element", ::arrow::int32(), false)), + true), + ::arrow::field("mapCol", ::arrow::map(::arrow::utf8(), arrow_leaf_struct), true), + }); + + // Create list: [10, 11, 12] + ::arrow::Int32Builder int_builder; + ASSERT_TRUE(int_builder.Append(10).ok()); + ASSERT_TRUE(int_builder.Append(11).ok()); + ASSERT_TRUE(int_builder.Append(12).ok()); + auto int_array = int_builder.Finish().ValueOrDie(); + + ::arrow::ListBuilder list_builder(::arrow::default_memory_pool(), + std::make_shared<::arrow::Int32Builder>()); + ASSERT_TRUE(list_builder.Append().ok()); + auto list_value_builder = + static_cast<::arrow::Int32Builder*>(list_builder.value_builder()); + ASSERT_TRUE(list_value_builder->Append(10).ok()); + ASSERT_TRUE(list_value_builder->Append(11).ok()); + ASSERT_TRUE(list_value_builder->Append(12).ok()); + auto list_array = list_builder.Finish().ValueOrDie(); + + // Create map: {"4" -> {leafIntCol: 1, leafStringCol: "BBB"}} + // MapArray needs offsets, keys, and items (struct values) + ::arrow::Int32Builder offset_builder; + ASSERT_TRUE(offset_builder.Append(0).ok()); // Start offset + ASSERT_TRUE(offset_builder.Append(1).ok()); // End offset (1 entry) + auto offsets = offset_builder.Finish().ValueOrDie(); + + ::arrow::StringBuilder key_builder; + ASSERT_TRUE(key_builder.Append("4").ok()); + auto keys = key_builder.Finish().ValueOrDie(); + + ::arrow::Int32Builder struct_int_builder; + ::arrow::StringBuilder struct_str_builder; + ASSERT_TRUE(struct_int_builder.Append(1).ok()); + ASSERT_TRUE(struct_str_builder.Append("BBB").ok()); + auto struct_int_array = struct_int_builder.Finish().ValueOrDie(); + auto struct_str_array = struct_str_builder.Finish().ValueOrDie(); + auto items = + ::arrow::StructArray::Make({struct_int_array, struct_str_array}, + {::arrow::field("leafIntCol", ::arrow::int32(), false), + ::arrow::field("leafStringCol", ::arrow::utf8(), true)}) + .ValueOrDie(); + + auto map_array = ::arrow::MapArray::FromArrays(offsets, keys, items).ValueOrDie(); + + std::vector> field_arrays = {list_array, map_array}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // For list and map elements, metrics should not be collected + // Field IDs: 1 (leafIntCol), 2 (leafStringCol), 4 (list element), 6 (map key), 7 (map + // value) + AssertCounts(1, std::nullopt, std::nullopt, metrics); + AssertCounts(2, std::nullopt, std::nullopt, metrics); + AssertCounts(4, std::nullopt, std::nullopt, metrics); + AssertCounts(6, std::nullopt, std::nullopt, metrics); + + AssertBounds(1, int32(), std::nullopt, std::nullopt, metrics); + AssertBounds(2, string(), std::nullopt, std::nullopt, metrics); + AssertBounds(4, int32(), std::nullopt, std::nullopt, metrics); + AssertBounds(6, string(), std::nullopt, std::nullopt, metrics); + ASSERT_FALSE(metrics.lower_bounds.contains(7)); + ASSERT_FALSE(metrics.upper_bounds.contains(7)); +} + +void MetricsTestBase::MetricsForNullColumns() { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "intCol", int32()), + }); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("intCol", ::arrow::int32(), true), + }); + + auto records = CreateRecordArrays(arrow_schema, R"([ + {"intCol": null}, + {"intCol": null} + ])"); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 2); + AssertCounts(1, 2, 2, metrics); + AssertBounds(1, int32(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::MetricsForNaNColumns() { + auto schema = FloatDoubleSchema(); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("floatCol", ::arrow::float32(), true), + ::arrow::field("doubleCol", ::arrow::float64(), true), + }); + + ::arrow::FloatBuilder float_builder; + ::arrow::DoubleBuilder double_builder; + + ASSERT_TRUE(float_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(double_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(float_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(double_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + + auto float_array = float_builder.Finish().ValueOrDie(); + auto double_array = double_builder.Finish().ValueOrDie(); + + std::vector> field_arrays = {float_array, double_array}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 2); + // TODO() Assert NaN metrics + AssertCounts(1, 2, 0, metrics); + AssertCounts(2, 2, 0, metrics); + + // When all values are NaN, bounds should not be set + AssertBounds(1, float32(), std::nullopt, std::nullopt, metrics); + AssertBounds(2, float64(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::ColumnBoundsWithNaNValueAtFront() { + auto schema = FloatDoubleSchema(); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("floatCol", ::arrow::float32(), true), + ::arrow::field("doubleCol", ::arrow::float64(), true), + }); + + ::arrow::FloatBuilder float_builder; + ::arrow::DoubleBuilder double_builder; + + // NaN, 1.2, 5.6 + ASSERT_TRUE(float_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(double_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(float_builder.Append(1.2F).ok()); + ASSERT_TRUE(double_builder.Append(3.4).ok()); + ASSERT_TRUE(float_builder.Append(5.6F).ok()); + ASSERT_TRUE(double_builder.Append(7.8).ok()); + + auto float_array = float_builder.Finish().ValueOrDie(); + auto double_array = double_builder.Finish().ValueOrDie(); + + std::vector> field_arrays = {float_array, double_array}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 3); + // TODO() Assert NaN metrics + AssertCounts(1, 3, 0, metrics); + AssertCounts(2, 3, 0, metrics); + + // Bounds should be computed from non-NaN values + if (metrics.lower_bounds.contains(1)) { + AssertBounds(1, float32(), 1.2F, 5.6F, metrics); + AssertBounds(2, float64(), 3.4, 7.8, metrics); + } +} + +void MetricsTestBase::ColumnBoundsWithNaNValueInMiddle() { + auto schema = FloatDoubleSchema(); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("floatCol", ::arrow::float32(), true), + ::arrow::field("doubleCol", ::arrow::float64(), true), + }); + + ::arrow::FloatBuilder float_builder; + ::arrow::DoubleBuilder double_builder; + + // 1.2, NaN, 5.6 + ASSERT_TRUE(float_builder.Append(1.2F).ok()); + ASSERT_TRUE(double_builder.Append(3.4).ok()); + ASSERT_TRUE(float_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(double_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(float_builder.Append(5.6F).ok()); + ASSERT_TRUE(double_builder.Append(7.8).ok()); + + auto float_array = float_builder.Finish().ValueOrDie(); + auto double_array = double_builder.Finish().ValueOrDie(); + + std::vector> field_arrays = {float_array, double_array}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 3); + AssertCounts(1, 3, 0, metrics); + AssertCounts(2, 3, 0, metrics); + + if (metrics.lower_bounds.contains(1)) { + AssertBounds(1, float32(), 1.2F, 5.6F, metrics); + AssertBounds(2, float64(), 3.4, 7.8, metrics); + } +} + +void MetricsTestBase::ColumnBoundsWithNaNValueAtEnd() { + auto schema = FloatDoubleSchema(); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("floatCol", ::arrow::float32(), true), + ::arrow::field("doubleCol", ::arrow::float64(), true), + }); + + ::arrow::FloatBuilder float_builder; + ::arrow::DoubleBuilder double_builder; + + // 1.2, 5.6, NaN + ASSERT_TRUE(float_builder.Append(1.2F).ok()); + ASSERT_TRUE(double_builder.Append(3.4).ok()); + ASSERT_TRUE(float_builder.Append(5.6F).ok()); + ASSERT_TRUE(double_builder.Append(7.8).ok()); + ASSERT_TRUE(float_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + ASSERT_TRUE(double_builder.Append(std::numeric_limits::quiet_NaN()).ok()); + + auto float_array = float_builder.Finish().ValueOrDie(); + auto double_array = double_builder.Finish().ValueOrDie(); + + std::vector> field_arrays = {float_array, double_array}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 3); + AssertCounts(1, 3, 0, metrics); + AssertCounts(2, 3, 0, metrics); + + if (metrics.lower_bounds.contains(1)) { + AssertBounds(1, float32(), 1.2F, 5.6F, metrics); + AssertBounds(2, float64(), 3.4, 7.8, metrics); + } +} + +void MetricsTestBase::MetricsForTopLevelWithMultipleRowGroup() { + auto schema = SimpleSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema)); + + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildSimpleRecords(arrow_schema, 201)); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + if (SupportsSmallRowGroups()) { + ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount()); + EXPECT_EQ(split_count, 3); + } + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 201); + + // Verify metrics are collected for top-level fields + AssertCounts(1, 201, 0, metrics); + AssertBounds(1, boolean(), false, true, metrics); + AssertCounts(2, 201, 0, metrics); + AssertBounds(2, int32(), 3, 203, metrics); + AssertCounts(3, 201, 1, metrics); + AssertBounds(3, int64(), 1, 200, metrics); + AssertCounts(4, 201, 0, metrics); + AssertBounds(4, float32(), 2.0F, 201.0F, metrics); + AssertCounts(5, 201, 0, metrics); + AssertBounds(5, float64(), 2.0, 201.0, metrics); + AssertCounts(6, 201L, 1L, metrics); + AssertBounds(6, std::make_shared(10, 2), Decimal(101), + Decimal(300), metrics); +} + +void MetricsTestBase::MetricsForNestedStructFieldsWithMultipleRowGroup() { + auto schema = NestedSchema(); + ICEBERG_UNWRAP_OR_FAIL(auto arrow_schema, ToArrowSchema(schema)); + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords(201)); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, records)); + + if (SupportsSmallRowGroups()) { + ICEBERG_UNWRAP_OR_FAIL(auto split_count, GetSplitCount()); + EXPECT_EQ(split_count, 3); + } + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 201); + + // Verify metrics for top-level field + AssertCounts(1, 201, 0, metrics); + AssertBounds(1, int32(), std::numeric_limits::min(), + std::numeric_limits::min() + 200, metrics); + + // Verify metrics for nested struct fields + AssertCounts(3, 201, 0, metrics); + AssertBounds(3, int64(), 100, 100 + 200, metrics); + + AssertCounts(5, 201, 0, metrics); + AssertBounds(5, int64(), 20, 20 + 200, metrics); + + AssertCounts(6, 201, 0L, metrics); + AssertBounds>(6, binary(), std::vector{'A'}, + std::vector{'A'}, metrics); + + AssertCounts(7, 201, 0L, metrics); + AssertBounds(7, float64(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::NoneMetricsMode() { + auto schema = NestedSchema(); + + std::unordered_map properties = { + {"write.metadata.metrics.default", "none"}}; + + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords()); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // In None mode, column_sizes should be empty + EXPECT_TRUE(metrics.column_sizes.empty()); + + // All counts should be null + AssertCounts(1, std::nullopt, std::nullopt, metrics); + AssertBounds(1, int32(), std::nullopt, std::nullopt, metrics); + AssertCounts(3, std::nullopt, std::nullopt, metrics); + AssertBounds(3, int64(), std::nullopt, std::nullopt, metrics); + AssertCounts(5, std::nullopt, std::nullopt, metrics); + AssertBounds(5, int64(), std::nullopt, std::nullopt, metrics); + AssertCounts(6, std::nullopt, std::nullopt, metrics); + AssertBounds(6, binary(), std::nullopt, std::nullopt, metrics); + AssertCounts(7, std::nullopt, std::nullopt, metrics); + AssertBounds(7, float64(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::CountsMetricsMode() { + auto schema = NestedSchema(); + + std::unordered_map properties = { + {"write.metadata.metrics.default", "counts"}}; + + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords()); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // In Counts mode, column_sizes should not be empty + EXPECT_FALSE(metrics.column_sizes.empty()); + + // Counts should be present but bounds should be null + AssertCounts(1, 1, 0, metrics); + AssertBounds(1, int32(), std::nullopt, std::nullopt, metrics); + AssertCounts(3, 1, 0, metrics); + AssertBounds(3, int64(), std::nullopt, std::nullopt, metrics); + AssertCounts(5, 1, 0, metrics); + AssertBounds(5, int64(), std::nullopt, std::nullopt, metrics); + AssertCounts(6, 1, 0, metrics); + AssertBounds(6, binary(), std::nullopt, std::nullopt, metrics); + AssertCounts(7, 1, 0, metrics); + AssertBounds(7, float64(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::FullMetricsMode() { + auto schema = NestedSchema(); + + std::unordered_map properties = { + {"write.metadata.metrics.default", "full"}}; + + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + ICEBERG_UNWRAP_OR_FAIL(auto records, BuildNestedRecords()); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // In Full mode, column_sizes should not be empty + EXPECT_FALSE(metrics.column_sizes.empty()); + + // Both counts and bounds should be present + AssertCounts(1, 1, 0, metrics); + AssertBounds(1, int32(), std::numeric_limits::min(), + std::numeric_limits::min(), metrics); + AssertCounts(3, 1, 0, metrics); + AssertBounds(3, int64(), 100, 100, metrics); + AssertCounts(5, 1, 0, metrics); + AssertBounds(5, int64(), 20, 20, metrics); + AssertCounts(6, 1, 0, metrics); + AssertBounds>(6, binary(), std::vector{'A'}, + std::vector{'A'}, metrics); + AssertCounts(7, 1, 0, metrics); + AssertBounds(7, float64(), std::nullopt, std::nullopt, metrics); +} + +void MetricsTestBase::TruncateStringMetricsMode() { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "str_to_truncate", string()), + }); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("str_to_truncate", ::arrow::utf8(), false), + }); + + auto records = CreateRecordArrays(arrow_schema, R"([ + {"str_to_truncate": "Lorem ipsum dolor sit amet"} + ])"); + + std::unordered_map properties = { + {"write.metadata.metrics.default", "truncate(10)"}}; + + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // Column sizes should not be empty + EXPECT_FALSE(metrics.column_sizes.empty()); + + AssertCounts(1, 1, 0, metrics); + + // Bounds should be truncated to 10 characters + // Lower bound: "Lorem ipsu" (first 10 chars) + // Upper bound: "Lorem ipsv" (first 10 chars with last char incremented) + std::string expected_lower = "Lorem ipsu"; + std::string expected_upper = "Lorem ipsv"; + AssertBounds(1, string(), expected_lower, expected_upper, metrics); +} + +void MetricsTestBase::TruncateBinaryMetricsMode() { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "bin_to_truncate", binary()), + }); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("bin_to_truncate", ::arrow::binary(), false), + }); + + // Create binary data: {0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8, 0x9, 0x10, 0xA, 0xB} + ::arrow::BinaryBuilder builder; + std::vector data = {0x1, 0x2, 0x3, 0x4, 0x5, 0x6, + 0x7, 0x8, 0x9, 0x10, 0xA, 0xB}; + ASSERT_TRUE(builder.Append(data.data(), data.size()).ok()); + auto array = builder.Finish().ValueOrDie(); + + std::vector> field_arrays = {array}; + auto records = + ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); + + std::unordered_map properties = { + {"write.metadata.metrics.default", "truncate(5)"}}; + + ICEBERG_UNWRAP_OR_FAIL(auto config, MetricsConfig::Make(properties)); + ICEBERG_UNWRAP_OR_FAIL(auto metrics, GetMetrics(schema, config, records)); + + ASSERT_TRUE(metrics.row_count.has_value()) << "row_count should be set"; + EXPECT_EQ(*metrics.row_count, 1); + + // Column sizes should not be empty + EXPECT_FALSE(metrics.column_sizes.empty()); + + AssertCounts(1, 1, 0, metrics); + + // Bounds should be truncated to 5 bytes + // Lower bound: {0x1, 0x2, 0x3, 0x4, 0x5} + // Upper bound: {0x1, 0x2, 0x3, 0x4, 0x6} (last byte incremented) + auto expected_lower = std::vector{0x1, 0x2, 0x3, 0x4, 0x5}; + auto expected_upper = std::vector{0x1, 0x2, 0x3, 0x4, 0x6}; + AssertBounds>(1, binary(), expected_lower, expected_upper, + metrics); +} + +Result> MetricsTestBase::BuildSimpleRecords( + std::shared_ptr<::arrow::Schema> arrow_schema, int32_t count) { + ::arrow::BooleanBuilder boolean_builder; + ::arrow::Int32Builder int_builder; + ::arrow::Int64Builder long_builder; + ::arrow::FloatBuilder float_builder; + ::arrow::DoubleBuilder double_builder; + ::arrow::Decimal128Builder decimal_builder(::arrow::decimal128(10, 2)); + ::arrow::StringBuilder string_builder; + ::arrow::Date32Builder date_builder; + ::arrow::Time64Builder time_builder(::arrow::time64(::arrow::TimeUnit::MICRO), + ::arrow::default_memory_pool()); + ::arrow::TimestampBuilder timestamp_above_builder( + ::arrow::timestamp(::arrow::TimeUnit::MICRO), ::arrow::default_memory_pool()); + ::arrow::FixedSizeBinaryBuilder fixed_builder(::arrow::fixed_size_binary(4)); + ::arrow::BinaryBuilder binary_builder; + ::arrow::TimestampBuilder timestamp_below_builder( + ::arrow::timestamp(::arrow::TimeUnit::MICRO), ::arrow::default_memory_pool()); + + // Append identical records + for (int i = 0; i < count; i++) { + ICEBERG_ARROW_RETURN_NOT_OK(boolean_builder.Append(i != 0)); + ICEBERG_ARROW_RETURN_NOT_OK(int_builder.Append(3 + i)); + ICEBERG_ARROW_RETURN_NOT_OK(i == 0 ? long_builder.AppendNull() + : long_builder.Append(i)); + ICEBERG_ARROW_RETURN_NOT_OK( + i == 0 ? float_builder.Append(std::numeric_limits::quiet_NaN()) + : float_builder.Append(1.0 + i)); + ICEBERG_ARROW_RETURN_NOT_OK( + i == 0 ? double_builder.Append(std::numeric_limits::quiet_NaN()) + : double_builder.Append(1.0 + i)); + ICEBERG_ARROW_RETURN_NOT_OK(i == 0 + ? decimal_builder.AppendNull() + : decimal_builder.Append(::arrow::Decimal128("100") + + i)); // 1.00 with scale 2 + ICEBERG_ARROW_RETURN_NOT_OK(string_builder.Append("AAA")); + ICEBERG_ARROW_RETURN_NOT_OK(date_builder.Append(1500 + i)); + ICEBERG_ARROW_RETURN_NOT_OK(time_builder.Append(2000 + i)); + ICEBERG_ARROW_RETURN_NOT_OK(timestamp_above_builder.Append(i + 1)); + ICEBERG_ARROW_RETURN_NOT_OK(fixed_builder.Append("abcd")); + ICEBERG_ARROW_RETURN_NOT_OK(binary_builder.Append("S")); + ICEBERG_ARROW_RETURN_NOT_OK(timestamp_below_builder.Append((i + 1) * -1)); + } + + auto boolean_array = boolean_builder.Finish().ValueOrDie(); + auto int_array = int_builder.Finish().ValueOrDie(); + auto long_array = long_builder.Finish().ValueOrDie(); + auto float_array = float_builder.Finish().ValueOrDie(); + auto double_array = double_builder.Finish().ValueOrDie(); + auto decimal_array = decimal_builder.Finish().ValueOrDie(); + auto string_array = string_builder.Finish().ValueOrDie(); + auto date_array = date_builder.Finish().ValueOrDie(); + auto time_array = time_builder.Finish().ValueOrDie(); + auto timestamp_above_array = timestamp_above_builder.Finish().ValueOrDie(); + auto fixed_array = fixed_builder.Finish().ValueOrDie(); + auto binary_array = binary_builder.Finish().ValueOrDie(); + auto timestamp_below_array = timestamp_below_builder.Finish().ValueOrDie(); + + std::vector> field_arrays = { + boolean_array, int_array, long_array, + float_array, double_array, decimal_array, + string_array, date_array, time_array, + timestamp_above_array, fixed_array, binary_array, + timestamp_below_array}; + return ::arrow::StructArray::Make(field_arrays, arrow_schema->fields()).ValueOrDie(); +} + +Result> MetricsTestBase::BuildNestedRecords( + int32_t count) { + auto leaf_struct_type = ::arrow::struct_({ + ::arrow::field("leafLongCol", ::arrow::int64(), true), + ::arrow::field("leafBinaryCol", ::arrow::binary(), true), + }); + + auto nested_struct_type = ::arrow::struct_({ + ::arrow::field("longCol", ::arrow::int64(), false), + ::arrow::field("leafStructCol", leaf_struct_type, false), + ::arrow::field("doubleCol", ::arrow::float64(), false), + }); + + auto arrow_schema = ::arrow::schema({ + ::arrow::field("intCol", ::arrow::int32(), false), + ::arrow::field("nestedStructCol", nested_struct_type, false), + }); + + // Build leaf struct: {leafLongCol: 20, leafBinaryCol: "A"} + ::arrow::Int64Builder leaf_long_builder; + ::arrow::BinaryBuilder leaf_binary_builder; + ::arrow::Int64Builder nested_long_builder; + ::arrow::DoubleBuilder nested_double_builder; + ::arrow::Int32Builder int_builder; + + for (int32_t i = 0; i < count; i++) { + ICEBERG_ARROW_RETURN_NOT_OK(leaf_long_builder.Append(20 + i)); + ICEBERG_ARROW_RETURN_NOT_OK(leaf_binary_builder.Append("A")); + + // Build nested struct: {longCol: 100, leafStructCol: {...}, doubleCol: NaN} + + ICEBERG_ARROW_RETURN_NOT_OK(nested_long_builder.Append(100 + i)); + ICEBERG_ARROW_RETURN_NOT_OK( + nested_double_builder.Append(std::numeric_limits::quiet_NaN())); + + // Build top-level struct: {intCol: 2147483647, nestedStructCol: {...}} + ICEBERG_ARROW_RETURN_NOT_OK( + int_builder.Append(std::numeric_limits::min() + i)); + } + + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto leaf_long_array, leaf_long_builder.Finish()); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto leaf_binary_array, leaf_binary_builder.Finish()); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto leaf_struct_array, + ::arrow::StructArray::Make({leaf_long_array, leaf_binary_array}, + leaf_struct_type->fields())); + + // Build nested struct: {longCol: 100, leafStructCol: {...}, doubleCol: NaN} + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto nested_long_array, nested_long_builder.Finish()); + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto nested_double_array, + nested_double_builder.Finish()); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto nested_struct_array, + ::arrow::StructArray::Make( + {nested_long_array, leaf_struct_array, nested_double_array}, + nested_struct_type->fields())); + + // Build top-level struct: {intCol: 2147483647, nestedStructCol: {...}} + ICEBERG_ARROW_ASSIGN_OR_RETURN(auto int_array, int_builder.Finish()); + ICEBERG_ARROW_ASSIGN_OR_RETURN( + auto records, ::arrow::StructArray::Make({int_array, nested_struct_array}, + arrow_schema->fields())); + return records; +} + +} // namespace iceberg::test diff --git a/src/iceberg/test/metrics_test_base.h b/src/iceberg/test/metrics_test_base.h new file mode 100644 index 000000000..1ad9c8823 --- /dev/null +++ b/src/iceberg/test/metrics_test_base.h @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include + +#include "iceberg/file_io.h" +#include "iceberg/metrics.h" +#include "iceberg/metrics_config.h" +#include "iceberg/result.h" +#include "iceberg/schema.h" + +namespace iceberg::test { + +/// \brief Base test class for metrics testing, similar to Java's TestMetrics +/// +/// This class provides common test infrastructure and helper methods for testing +/// metrics collection across different file formats (Parquet, Avro, ORC). +class MetricsTestBase { + protected: + virtual void SetUp(); + + /// \brief Get metrics for the given schema and records + virtual Result GetMetrics(std::shared_ptr schema, + std::shared_ptr<::arrow::Array> records) = 0; + + /// \brief Get metrics with custom MetricsConfig + virtual Result GetMetrics(std::shared_ptr schema, + std::shared_ptr config, + std::shared_ptr<::arrow::Array> records) = 0; + + /// \brief Create an output file for testing + virtual std::string CreateOutputFile() = 0; + + /// \brief Get the number of row groups/splits in a file + virtual Result GetSplitCount() = 0; + + /// \brief Whether the format supports small row groups for testing + virtual bool SupportsSmallRowGroups() const { return false; } + + // Helper methods for assertions + void AssertCounts(int field_id, std::optional expected_value_count, + std::optional expected_null_count, const Metrics& metrics); + + void AssertCounts(int field_id, std::optional expected_value_count, + std::optional expected_null_count, + std::optional expected_nan_count, const Metrics& metrics); + + template + void AssertBounds(int field_id, std::shared_ptr type, + std::optional expected_lower, std::optional expected_upper, + const Metrics& metrics); + + // Helper methods for creating test data + std::shared_ptr<::arrow::Array> CreateRecordArrays( + const std::shared_ptr<::arrow::Schema>& arrow_schema, const std::string& json_data); + + // Common test schemas + static std::shared_ptr SimpleSchema(); + static std::shared_ptr NestedSchema(); + static std::shared_ptr FloatDoubleSchema(); + + // Test case methods - subclasses should call these from TEST_F macros + void MetricsForRepeatedValues(); + void MetricsForTopLevelFields(); + void MetricsForDecimals(); + void MetricsForNestedStructFields(); + void MetricsModeForNestedStructFields(); + void MetricsForListAndMapElements(); + void MetricsForNullColumns(); + void MetricsForNaNColumns(); + void ColumnBoundsWithNaNValueAtFront(); + void ColumnBoundsWithNaNValueInMiddle(); + void ColumnBoundsWithNaNValueAtEnd(); + void MetricsForTopLevelWithMultipleRowGroup(); + void MetricsForNestedStructFieldsWithMultipleRowGroup(); + void NoneMetricsMode(); + void CountsMetricsMode(); + void FullMetricsMode(); + void TruncateStringMetricsMode(); + void TruncateBinaryMetricsMode(); + + private: + Result> BuildSimpleRecords( + std::shared_ptr<::arrow::Schema> arrow_schema, int32_t count = 1); + Result> BuildNestedRecords(int32_t count = 1); + + protected: + std::shared_ptr file_io_; + std::string temp_dir_; + std::string path_; +}; + +#define DEFINE_METRICS_TEST_CASE(TestClass, Case) \ + TEST_F(TestClass, Case) { Case(); } + +#define DEFINE_METRICS_TESTS(TestClass) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForRepeatedValues) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForTopLevelFields) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNestedStructFields) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNullColumns) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNaNColumns) \ + DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueAtFront) \ + DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueInMiddle) \ + DEFINE_METRICS_TEST_CASE(TestClass, ColumnBoundsWithNaNValueAtEnd) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForDecimals) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForListAndMapElements) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsModeForNestedStructFields) \ + DEFINE_METRICS_TEST_CASE(TestClass, NoneMetricsMode) \ + DEFINE_METRICS_TEST_CASE(TestClass, CountsMetricsMode) \ + DEFINE_METRICS_TEST_CASE(TestClass, FullMetricsMode) \ + DEFINE_METRICS_TEST_CASE(TestClass, TruncateStringMetricsMode) \ + DEFINE_METRICS_TEST_CASE(TestClass, TruncateBinaryMetricsMode) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForTopLevelWithMultipleRowGroup) \ + DEFINE_METRICS_TEST_CASE(TestClass, MetricsForNestedStructFieldsWithMultipleRowGroup) + +} // namespace iceberg::test diff --git a/src/iceberg/test/parquet_metrics_test.cc b/src/iceberg/test/parquet_metrics_test.cc new file mode 100644 index 000000000..5aad69402 --- /dev/null +++ b/src/iceberg/test/parquet_metrics_test.cc @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/arrow/arrow_status_internal.h" +#include "iceberg/file_writer.h" +#include "iceberg/parquet/parquet_register.h" +#include "iceberg/test/metrics_test_base.h" +#include "iceberg/util/checked_cast.h" + +namespace iceberg::test { + +class ParquetMetricsTest : public MetricsTestBase, public ::testing::Test { + protected: + static void SetUpTestSuite() { parquet::RegisterAll(); } + + void SetUp() override { + MetricsTestBase::SetUp(); + temp_parquet_file_ = "parquet_metrics_test.parquet"; + writer_properties_ = WriterProperties::FromMap( + {{WriterProperties::kParquetCompression.key(), "uncompressed"}}); + } + + Result GetMetrics(std::shared_ptr schema, + std::shared_ptr<::arrow::Array> records) override { + return GetMetrics(schema, MetricsConfig::Default(), records); + } + + Result GetMetrics(std::shared_ptr schema, + std::shared_ptr config, + std::shared_ptr<::arrow::Array> records) override { + ICEBERG_ASSIGN_OR_RAISE( + auto writer, WriterFactoryRegistry::Open(FileFormatType::kParquet, + {.path = temp_parquet_file_, + .schema = schema, + .io = file_io_, + .metadata = {}, + .metrics_config = config, + .properties = writer_properties_})); + ArrowArray arr; + ICEBERG_ARROW_RETURN_NOT_OK(::arrow::ExportArray(*records, &arr)); + ICEBERG_RETURN_UNEXPECTED(writer->Write(&arr)); + ICEBERG_RETURN_UNEXPECTED(writer->Close()); + return writer->metrics(); + } + + std::string CreateOutputFile() override { return temp_parquet_file_; } + + Result GetSplitCount() override { + auto io = internal::checked_cast(*file_io_); + auto infile = io.fs()->OpenInputFile(temp_parquet_file_).ValueOrDie(); + auto metadata = ::parquet::ReadMetaData(infile); + return metadata->num_row_groups(); + } + + bool SupportsSmallRowGroups() const override { return false; } + + private: + std::string temp_parquet_file_; + WriterProperties writer_properties_; +}; + +DEFINE_METRICS_TESTS(ParquetMetricsTest); + +} // namespace iceberg::test diff --git a/src/iceberg/util/truncate_util.cc b/src/iceberg/util/truncate_util.cc index aba22d17e..848ded2ad 100644 --- a/src/iceberg/util/truncate_util.cc +++ b/src/iceberg/util/truncate_util.cc @@ -24,6 +24,7 @@ #include #include "iceberg/expression/literal.h" +#include "iceberg/type.h" #include "iceberg/util/checked_cast.h" namespace iceberg { @@ -293,4 +294,26 @@ Result TruncateUtils::TruncateLiteralMax(const Literal& literal, int32_ } } +Result TruncateUtils::TruncateLowerBound(const PrimitiveType& type, + const Literal& value, int32_t length) { + switch (type.type_id()) { + case TypeId::kString: + case TypeId::kBinary: + return TruncateLiteral(value, length); + default: + return value; + } +} + +Result TruncateUtils::TruncateUpperBound(const PrimitiveType& type, + const Literal& value, int32_t length) { + switch (type.type_id()) { + case TypeId::kString: + case TypeId::kBinary: + return TruncateLiteralMax(value, length); + default: + return value; + } +} + } // namespace iceberg diff --git a/src/iceberg/util/truncate_util.h b/src/iceberg/util/truncate_util.h index 1a1824a24..a8cdfb17c 100644 --- a/src/iceberg/util/truncate_util.h +++ b/src/iceberg/util/truncate_util.h @@ -113,6 +113,30 @@ class ICEBERG_EXPORT TruncateUtils { /// or the smallest Literal greater than the truncated prefix, or an error if no such /// value exists or cannot be represented. static Result TruncateLiteralMax(const Literal& value, int32_t width); + + /// \brief Truncate the lower bound of a string or binary value. + /// + /// For string/binary types, truncates to the given length. For other types, returns the + /// value unchanged. + /// + /// \param type The Iceberg primitive type. + /// \param value The lower bound literal value. + /// \param width The width to truncate to. + /// \return The truncated lower bound literal. + static Result TruncateLowerBound(const PrimitiveType& type, + const Literal& value, int32_t width); + + /// \brief Truncate the upper bound of a string or binary value. + /// + /// For string/binary types, truncates to the smallest value greater than the truncated + /// prefix. For other types, returns the value unchanged. + /// + /// \param type The Iceberg primitive type. + /// \param value The upper bound literal value. + /// \param width The width to truncate to. + /// \return The truncated upper bound literal. + static Result TruncateUpperBound(const PrimitiveType& type, + const Literal& value, int32_t width); }; } // namespace iceberg