diff --git a/crates/catalog/glue/src/schema.rs b/crates/catalog/glue/src/schema.rs index 864320dae4..3fe9e83914 100644 --- a/crates/catalog/glue/src/schema.rs +++ b/crates/catalog/glue/src/schema.rs @@ -25,7 +25,7 @@ pub(crate) const ICEBERG_FIELD_CURRENT: &str = "iceberg.field.current"; use std::collections::HashMap; use aws_sdk_glue::types::Column; -use iceberg::spec::{PrimitiveType, SchemaVisitor, TableMetadata, visit_schema}; +use iceberg::spec::{PrimitiveType, SchemaVisitor, TableMetadata, VariantType, visit_schema}; use iceberg::{Error, ErrorKind, Result}; use crate::error::from_aws_build_error; @@ -182,6 +182,10 @@ impl SchemaVisitor for GlueSchemaBuilder { Ok(glue_type) } + + fn variant(&mut self, _v: &VariantType) -> Result { + Ok("variant".to_string()) + } } #[cfg(test)] diff --git a/crates/catalog/hms/src/schema.rs b/crates/catalog/hms/src/schema.rs index c23b48719d..54c21a8b12 100644 --- a/crates/catalog/hms/src/schema.rs +++ b/crates/catalog/hms/src/schema.rs @@ -16,7 +16,7 @@ // under the License. use hive_metastore::FieldSchema; -use iceberg::spec::{PrimitiveType, Schema, SchemaVisitor, visit_schema}; +use iceberg::spec::{PrimitiveType, Schema, SchemaVisitor, VariantType, visit_schema}; use iceberg::{Error, ErrorKind, Result}; type HiveSchema = Vec; @@ -139,6 +139,12 @@ impl SchemaVisitor for HiveSchemaBuilder { Ok(hive_type) } + + fn variant(&mut self, _v: &VariantType) -> Result { + // Match iceberg-java's HiveSchemaUtil, which maps VARIANT to "unknown" + // (apache/iceberg#15964). + Ok("unknown".to_string()) + } } #[cfg(test)] diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index a88c91d625..5734caea83 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -1679,6 +1679,7 @@ pub iceberg::spec::Type::List(iceberg::spec::ListType) pub iceberg::spec::Type::Map(iceberg::spec::MapType) pub iceberg::spec::Type::Primitive(iceberg::spec::PrimitiveType) pub iceberg::spec::Type::Struct(iceberg::spec::StructType) +pub iceberg::spec::Type::Variant(iceberg::spec::VariantType) impl iceberg::spec::Type pub fn iceberg::spec::Type::as_primitive_type(&self) -> core::option::Option<&iceberg::spec::PrimitiveType> pub fn iceberg::spec::Type::decimal(precision: u32, scale: u32) -> iceberg::Result @@ -1688,6 +1689,8 @@ pub fn iceberg::spec::Type::is_floating_type(&self) -> bool pub fn iceberg::spec::Type::is_nested(&self) -> bool pub fn iceberg::spec::Type::is_primitive(&self) -> bool pub fn iceberg::spec::Type::is_struct(&self) -> bool +pub fn iceberg::spec::Type::is_variant(&self) -> bool +pub fn iceberg::spec::Type::min_format_version(&self) -> iceberg::spec::FormatVersion pub fn iceberg::spec::Type::to_struct_type(self) -> core::option::Option impl core::clone::Clone for iceberg::spec::Type pub fn iceberg::spec::Type::clone(&self) -> iceberg::spec::Type @@ -1702,6 +1705,8 @@ impl core::convert::From for iceberg::spec::Type pub fn iceberg::spec::Type::from(value: iceberg::spec::PrimitiveType) -> Self impl core::convert::From for iceberg::spec::Type pub fn iceberg::spec::Type::from(value: iceberg::spec::StructType) -> Self +impl core::convert::From for iceberg::spec::Type +pub fn iceberg::spec::Type::from(_: iceberg::spec::VariantType) -> Self impl core::fmt::Debug for iceberg::spec::Type pub fn iceberg::spec::Type::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl core::fmt::Display for iceberg::spec::Type @@ -2295,6 +2300,7 @@ impl iceberg::spec::Schema pub fn iceberg::spec::Schema::accessor_by_field_id(&self, field_id: i32) -> core::option::Option> pub fn iceberg::spec::Schema::as_struct(&self) -> &iceberg::spec::StructType pub fn iceberg::spec::Schema::builder() -> iceberg::spec::SchemaBuilder +pub fn iceberg::spec::Schema::check_format_compatibility(&self, format_version: iceberg::spec::FormatVersion) -> iceberg::Result<()> pub fn iceberg::spec::Schema::field_by_alias(&self, alias: &str) -> core::option::Option<&iceberg::spec::NestedFieldRef> pub fn iceberg::spec::Schema::field_by_id(&self, field_id: i32) -> core::option::Option<&iceberg::spec::NestedFieldRef> pub fn iceberg::spec::Schema::field_by_name(&self, field_name: &str) -> core::option::Option<&iceberg::spec::NestedFieldRef> @@ -2305,6 +2311,7 @@ pub fn iceberg::spec::Schema::field_id_to_name_map(&self) -> &std::collections:: pub fn iceberg::spec::Schema::highest_field_id(&self) -> i32 pub fn iceberg::spec::Schema::identifier_field_ids(&self) -> impl core::iter::traits::exact_size::ExactSizeIterator + '_ pub fn iceberg::spec::Schema::into_builder(self) -> iceberg::spec::SchemaBuilder +pub fn iceberg::spec::Schema::min_format_version(&self) -> iceberg::spec::FormatVersion pub fn iceberg::spec::Schema::name_by_field_id(&self, field_id: i32) -> core::option::Option<&str> pub fn iceberg::spec::Schema::schema_id(&self) -> iceberg::spec::SchemaId impl core::clone::Clone for iceberg::spec::Schema @@ -2802,6 +2809,26 @@ impl core::default::Default for iceberg::spec::UnboundPartitionSpecBuilder pub fn iceberg::spec::UnboundPartitionSpecBuilder::default() -> iceberg::spec::UnboundPartitionSpecBuilder impl core::fmt::Debug for iceberg::spec::UnboundPartitionSpecBuilder pub fn iceberg::spec::UnboundPartitionSpecBuilder::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub struct iceberg::spec::VariantType +impl core::clone::Clone for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::clone(&self) -> iceberg::spec::VariantType +impl core::cmp::Eq for iceberg::spec::VariantType +impl core::cmp::PartialEq for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::eq(&self, other: &iceberg::spec::VariantType) -> bool +impl core::convert::From for iceberg::spec::Type +pub fn iceberg::spec::Type::from(_: iceberg::spec::VariantType) -> Self +impl core::fmt::Debug for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::fmt::Display for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::hash::Hash for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::hash<__H: core::hash::Hasher>(&self, state: &mut __H) +impl core::marker::Copy for iceberg::spec::VariantType +impl core::marker::StructuralPartialEq for iceberg::spec::VariantType +impl serde_core::ser::Serialize for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::serialize(&self, serializer: S) -> core::result::Result<::Ok, ::Error> where S: serde_core::ser::Serializer +impl<'de> serde_core::de::Deserialize<'de> for iceberg::spec::VariantType +pub fn iceberg::spec::VariantType::deserialize(deserializer: D) -> core::result::Result::Error> where D: serde_core::de::Deserializer<'de> pub struct iceberg::spec::ViewMetadata impl iceberg::spec::ViewMetadata pub fn iceberg::spec::ViewMetadata::current_schema(&self) -> &iceberg::spec::SchemaRef @@ -2921,6 +2948,7 @@ pub const iceberg::spec::LIST_FIELD_NAME: &str pub const iceberg::spec::MAIN_BRANCH: &str pub const iceberg::spec::MAP_KEY_FIELD_NAME: &str pub const iceberg::spec::MAP_VALUE_FIELD_NAME: &str +pub const iceberg::spec::MIN_FORMAT_VERSION_DEFAULT_VALUES: iceberg::spec::FormatVersion pub const iceberg::spec::MIN_FORMAT_VERSION_ROW_LINEAGE: iceberg::spec::FormatVersion pub const iceberg::spec::SCHEMA_NAME_DELIMITER: &str pub const iceberg::spec::UNASSIGNED_SEQUENCE_NUMBER: i64 @@ -2956,6 +2984,7 @@ pub fn iceberg::spec::SchemaVisitor::map(&mut self, map: &iceberg::spec::MapType pub fn iceberg::spec::SchemaVisitor::primitive(&mut self, p: &iceberg::spec::PrimitiveType) -> iceberg::Result pub fn iceberg::spec::SchemaVisitor::schema(&mut self, schema: &iceberg::spec::Schema, value: Self::T) -> iceberg::Result pub fn iceberg::spec::SchemaVisitor::struct(&mut self, struct: &iceberg::spec::StructType, results: alloc::vec::Vec) -> iceberg::Result +pub fn iceberg::spec::SchemaVisitor::variant(&mut self, _v: &iceberg::spec::VariantType) -> iceberg::Result pub trait iceberg::spec::SchemaWithPartnerVisitor

pub type iceberg::spec::SchemaWithPartnerVisitor::T pub fn iceberg::spec::SchemaWithPartnerVisitor::after_list_element(&mut self, _field: &iceberg::spec::NestedFieldRef, _partner: &P) -> iceberg::Result<()> @@ -2972,6 +3001,7 @@ pub fn iceberg::spec::SchemaWithPartnerVisitor::map(&mut self, map: &iceberg::sp pub fn iceberg::spec::SchemaWithPartnerVisitor::primitive(&mut self, p: &iceberg::spec::PrimitiveType, partner: &P) -> iceberg::Result pub fn iceberg::spec::SchemaWithPartnerVisitor::schema(&mut self, schema: &iceberg::spec::Schema, partner: &P, value: Self::T) -> iceberg::Result pub fn iceberg::spec::SchemaWithPartnerVisitor::struct(&mut self, struct: &iceberg::spec::StructType, partner: &P, results: alloc::vec::Vec) -> iceberg::Result +pub fn iceberg::spec::SchemaWithPartnerVisitor::variant(&mut self, _v: &iceberg::spec::VariantType, _partner: &P) -> iceberg::Result pub fn iceberg::spec::deserialize_data_file_from_json(json: &str, partition_spec_id: i32, partition_type: &iceberg::spec::StructType, schema: &iceberg::spec::Schema) -> iceberg::Result pub fn iceberg::spec::prune_columns(schema: &iceberg::spec::Schema, selected: impl core::iter::traits::collect::IntoIterator, select_full_types: bool) -> iceberg::Result pub fn iceberg::spec::read_data_files_from_avro(reader: &mut R, schema: &iceberg::spec::Schema, partition_spec_id: i32, partition_type: &iceberg::spec::StructType, version: iceberg::spec::FormatVersion) -> iceberg::Result> diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 81e92fcb98..8ebf10826f 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -35,7 +35,7 @@ use crate::runtime::Runtime; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; use crate::spec::{ DataContentType, Datum, ListType, MapType, NestedField, NestedFieldRef, PartnerAccessor, - PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, + PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, StructType, Type, VariantType, visit_schema_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -544,6 +544,10 @@ impl SchemaWithPartnerVisitor for EqDelColumnProcessor<'_> { fn primitive(&mut self, _primitive: &PrimitiveType, _partner: &ArrayRef) -> Result<()> { Ok(()) } + + fn variant(&mut self, _v: &VariantType, _partner: &ArrayRef) -> Result<()> { + Ok(()) + } } struct EqDelRecordBatchPartnerAccessor; diff --git a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs index e514457887..d01f3e9e56 100644 --- a/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs +++ b/crates/iceberg/src/arrow/nan_val_cnt_visitor.rs @@ -28,7 +28,7 @@ use crate::Result; use crate::arrow::{ArrowArrayAccessor, FieldMatchMode}; use crate::spec::{ ListType, MapType, NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaWithPartnerVisitor, - StructType, visit_struct_with_partner, + StructType, VariantType, visit_struct_with_partner, }; macro_rules! cast_and_update_cnt_map { @@ -122,6 +122,10 @@ impl SchemaWithPartnerVisitor for NanValueCountVisitor { Ok(()) } + fn variant(&mut self, _v: &VariantType, _col: &ArrayRef) -> Result { + Ok(()) + } + fn after_struct_field(&mut self, field: &NestedFieldRef, partner: &ArrayRef) -> Result<()> { let field_id = field.id; count_float_nans!(partner, self, field_id); diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index 2589c78366..0d75adcf5a 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -23,7 +23,7 @@ use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; -use arrow_schema::{Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_schema::{DataType, Field, FieldRef, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ProjectionMask}; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; @@ -76,7 +76,111 @@ impl ArrowReader { Self::include_leaf_field_id(&map_type.key_field, field_ids); Self::include_leaf_field_id(&map_type.value_field, field_ids); } + // Variant is a leaf type for Parquet projection purposes (like a primitive). + Type::Variant(_) => { + field_ids.push(field.id); + } + } + } + + /// Recursive DFS over an Arrow `Fields` tree whose leaf numbering matches + /// `arrow_schema::Fields::filter_leaves`. For every leaf sitting inside a + /// variant column, stores `leaf_idx → variant_field_id` in `out`. A + /// "variant column" is any Arrow field whose embedded Parquet field id + /// resolves to `Type::Variant` in the Iceberg schema — including variants + /// nested inside a struct, list, or map. + fn collect_variant_leaves( + fields: &arrow_schema::Fields, + leaf_idx: &mut usize, + variant_parent: Option, + iceberg_schema: &Schema, + leaf_field_id_set: &HashSet, + out: &mut HashMap, + ) -> Result<()> { + for field in fields { + Self::collect_variant_leaves_in_field( + field, + leaf_idx, + variant_parent, + iceberg_schema, + leaf_field_id_set, + out, + )?; } + Ok(()) + } + + fn collect_variant_leaves_in_field( + field: &FieldRef, + leaf_idx: &mut usize, + variant_parent: Option, + iceberg_schema: &Schema, + leaf_field_id_set: &HashSet, + out: &mut HashMap, + ) -> Result<()> { + // Once we are inside a variant, stay inside; otherwise check + // whether this Arrow field IS a variant column. + let entering_variant = variant_parent.is_none(); + let effective_variant = variant_parent.or_else(|| { + let fid = field + .metadata() + .get(PARQUET_FIELD_ID_META_KEY) + .and_then(|s| i32::from_str(s).ok())?; + if !leaf_field_id_set.contains(&fid) { + return None; + } + let iceberg_field = iceberg_schema.field_by_id(fid)?; + matches!(iceberg_field.field_type.as_ref(), Type::Variant(_)).then_some(fid) + }); + + // Reject shredded variants: a `typed_value` sub-field means the payload is + // shredded, which we can't reconstruct yet. Projecting only metadata/value + // would silently drop it, so fail loudly instead. + if entering_variant + && effective_variant.is_some() + && let DataType::Struct(sub) = field.data_type() + && sub.iter().any(|f| f.name() == "typed_value") + { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Reading shredded variant columns is not supported yet: found a \ + `typed_value` sub-field. Only unshredded variants (metadata + value) \ + can be read.", + )); + } + + match field.data_type() { + DataType::Struct(sub) => { + Self::collect_variant_leaves( + sub, + leaf_idx, + effective_variant, + iceberg_schema, + leaf_field_id_set, + out, + )?; + } + DataType::List(inner) + | DataType::LargeList(inner) + | DataType::FixedSizeList(inner, _) + | DataType::Map(inner, _) => { + Self::collect_variant_leaves_in_field( + inner, + leaf_idx, + effective_variant, + iceberg_schema, + leaf_field_id_set, + out, + )?; + } + _ => { + if let Some(vid) = effective_variant { + out.insert(*leaf_idx, vid); + } + *leaf_idx += 1; + } + } + Ok(()) } pub(super) fn get_arrow_projection_mask( @@ -148,8 +252,38 @@ impl ArrowReader { arrow_schema: &ArrowSchemaRef, type_promotion_is_valid: fn(Option<&PrimitiveType>, Option<&PrimitiveType>) -> bool, ) -> Result { - let mut column_map = HashMap::new(); + // Maps field_id → leaf column indices. Vec because variant contributes two + // leaves (metadata + value) under a single field ID. + let mut column_map: HashMap> = HashMap::new(); let fields = arrow_schema.fields(); + // HashSet for O(1) membership checks instead of O(n) slice scans. + let leaf_field_id_set: HashSet = leaf_field_ids.iter().copied().collect(); + + // Variant fields are an Iceberg leaf type but a Parquet GROUP. Their + // sub-fields (metadata, value, and any shredded children) carry no + // embedded field IDs — only the parent group has the field ID. + // `filter_leaves` therefore never finds them via the standard field-ID + // scan below. + // + // Java's PruneColumns.variant() returns the variant group unchanged, so + // every leaf beneath it is projected as part of the variant column. We + // replicate that here with a recursive DFS over the Arrow schema whose + // leaf-numbering matches `Fields::filter_leaves`. For each Arrow leaf + // index that sits inside any variant (top-level OR nested inside a + // struct/list/map), we record the enclosing variant's field id. + let variant_leaves: HashMap = { + let mut out = HashMap::new(); + let mut leaf_idx: usize = 0; + Self::collect_variant_leaves( + fields, + &mut leaf_idx, + None, + iceberg_schema_of_task, + &leaf_field_id_set, + &mut out, + )?; + out + }; // Pre-project only the fields that have been selected, possibly avoiding converting // some Arrow types that are not yet supported. @@ -161,7 +295,7 @@ impl ArrowReader { .and_then(|field_id| i32::from_str(field_id).ok()) .is_some_and(|field_id| { projected_fields.insert((*f).clone(), field_id); - leaf_field_ids.contains(&field_id) + leaf_field_id_set.contains(&field_id) }) }), arrow_schema.metadata().clone(), @@ -169,6 +303,14 @@ impl ArrowReader { let iceberg_schema = arrow_schema_to_schema(&projected_arrow_schema)?; fields.filter_leaves(|idx, field| { + // Variant sub-fields: parent group carries the field ID, not the leaf. + // Skip type-promotion check — Type::Variant is not a primitive type + // (matches Java's PruneColumns.variant() which returns the group unchanged). + if let Some(&variant_field_id) = variant_leaves.get(&idx) { + column_map.entry(variant_field_id).or_default().push(idx); + return true; + } + let Some(field_id) = projected_fields.get(field).cloned() else { return false; }; @@ -190,7 +332,7 @@ impl ArrowReader { return false; } - column_map.insert(field_id, idx); + column_map.entry(field_id).or_default().push(idx); true }); @@ -198,8 +340,8 @@ impl ArrowReader { // We only project existing columns; RecordBatchTransformer adds default/NULL values. let mut indices = vec![]; for field_id in leaf_field_ids { - if let Some(col_idx) = column_map.get(field_id) { - indices.push(*col_idx); + if let Some(col_indices) = column_map.get(field_id) { + indices.extend_from_slice(col_indices); } } @@ -435,7 +577,9 @@ mod tests { use crate::expr::{Bind, Reference}; use crate::io::FileIO; use crate::scan::{FileScanTask, FileScanTaskStream}; - use crate::spec::{DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type}; + use crate::spec::{ + DataFileFormat, Datum, NestedField, PrimitiveType, Schema, Type, VariantType, + }; use crate::{ErrorKind, Runtime}; #[test] @@ -530,6 +674,284 @@ message schema { assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0])); } + /// Variant fields are an Iceberg leaf type but a Parquet GROUP whose sub-fields carry + /// no embedded field IDs. The projection mask must include both metadata and value + /// leaves when the variant field ID is requested, and must not drop the variant when + /// it is projected alongside ordinary primitive columns. + #[test] + fn test_arrow_projection_mask_variant() { + // Iceberg schema: c1 (String, id=1) + v (Variant, id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(), + ); + + // Arrow schema: c1 with field ID 1; v as Struct(metadata: Binary, value: Binary) + // with field ID 2 on the struct but NO field IDs on the sub-fields — that is the + // Parquet variant wire format. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new( + "v", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, false)), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + // Parquet message: c1 is leaf 0; variant sub-fields metadata=leaf 1, value=leaf 2. + // No field IDs on sub-leaves — matching the real Iceberg/Spark-written variant format. + let message_type = " +message schema { + required binary c1 (STRING) = 1; + required group v = 2 { + required binary metadata; + required binary value; + } +} +"; + let parquet_type = parse_message_type(message_type).expect("should parse schema"); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type)); + + // Both fields: all three leaves must be included. + let mask = ArrowReader::get_arrow_projection_mask( + &[1, 2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for c1 + v"); + assert_eq!(mask, ProjectionMask::leaves(&parquet_schema, vec![0, 1, 2])); + + // Variant only: leaves 1 (metadata) and 2 (value) must be included. + let mask_variant_only = ArrowReader::get_arrow_projection_mask( + &[2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for v only"); + assert_eq!( + mask_variant_only, + ProjectionMask::leaves(&parquet_schema, vec![1, 2]), + ); + + // Primitive only: leaf 0 (c1) must be included; variant NOT included. + let mask_primitive_only = ArrowReader::get_arrow_projection_mask( + &[1], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for c1 only"); + assert_eq!( + mask_primitive_only, + ProjectionMask::leaves(&parquet_schema, vec![0]), + ); + } + + /// A shredded variant (with `typed_value`) must be rejected, not silently dropped. + #[test] + fn test_arrow_projection_mask_variant_shredded_is_rejected() { + // Iceberg schema: c1 (String, id=1) + v (Variant, id=2) + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "c1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(), + ); + + // Arrow schema: v is a SHREDDED variant — Struct(metadata, value, typed_value), + // field ID 2 on the struct, no field IDs on the sub-fields. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("c1", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + Field::new( + "v", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, true)), + Arc::new(Field::new("typed_value", DataType::Int64, true)), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ])); + + let message_type = " +message schema { + required binary c1 (STRING) = 1; + required group v = 2 { + required binary metadata; + optional binary value; + optional int64 typed_value; + } +} +"; + let parquet_type = parse_message_type(message_type).expect("should parse schema"); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type)); + + let err = ArrowReader::get_arrow_projection_mask( + &[1, 2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect_err("shredded variant must be rejected"); + assert_eq!(err.kind(), ErrorKind::FeatureUnsupported); + assert!( + err.message().contains("shredded variant"), + "unexpected error message: {err}" + ); + } + + /// variant nested inside a struct must also have its sub-leaves + /// included in the projection mask. + #[test] + fn test_arrow_projection_mask_variant_nested_in_struct() { + // Iceberg schema: parent struct (id=1) containing c2 String (id=2) and + // v Variant (id=3). + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "parent", + Type::Struct(crate::spec::StructType::new(vec![ + NestedField::required(2, "c2", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(3, "v", Type::Variant(VariantType)).into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + // Arrow: parent struct (id=1) → [c2 (id=2), v struct(metadata,value) (id=3)]. + // Variant sub-fields intentionally without field IDs. + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "parent", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new( + Field::new("c2", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "2".to_string(), + )])), + ), + Arc::new( + Field::new( + "v", + DataType::Struct(arrow_schema::Fields::from(vec![ + Arc::new(Field::new("metadata", DataType::Binary, false)), + Arc::new(Field::new("value", DataType::Binary, false)), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "3".to_string(), + )])), + ), + ])), + false, + ) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + // Parquet: parent.c2 = leaf 0; parent.v.metadata = leaf 1; parent.v.value = leaf 2. + let message_type = " +message schema { + required group parent = 1 { + required binary c2 (STRING) = 2; + required group v = 3 { + required binary metadata; + required binary value; + } + } +} +"; + let parquet_type = parse_message_type(message_type).expect("should parse schema"); + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_type)); + + // Projecting the nested variant must include both of its leaves. + let mask_variant = ArrowReader::get_arrow_projection_mask( + &[3], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for nested variant"); + assert_eq!( + mask_variant, + ProjectionMask::leaves(&parquet_schema, vec![1, 2]), + "variant nested in a struct was dropped from projection" + ); + + // Projecting the sibling primitive must not include variant leaves. + let mask_primitive = ArrowReader::get_arrow_projection_mask( + &[2], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for nested primitive"); + assert_eq!( + mask_primitive, + ProjectionMask::leaves(&parquet_schema, vec![0]), + ); + + // Projecting both must include all three leaves. + let mask_both = ArrowReader::get_arrow_projection_mask( + &[2, 3], + &schema, + &parquet_schema, + &arrow_schema, + false, + ) + .expect("projection mask for nested primitive + variant"); + assert_eq!( + mask_both, + ProjectionMask::leaves(&parquet_schema, vec![0, 1, 2]), + ); + } + /// Test schema evolution: reading old Parquet file (with only column 'a') /// using a newer table schema (with columns 'a' and 'b'). /// This tests that: diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index e6649d926d..b3bc98ab21 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -35,7 +35,7 @@ use crate::error::Result; use crate::spec::decimal_utils::i128_from_be_bytes; use crate::spec::{ Datum, FIRST_FIELD_ID, ListType, MapType, NestedField, NestedFieldRef, PrimitiveLiteral, - PrimitiveType, Schema, SchemaVisitor, StructType, Type, + PrimitiveType, Schema, SchemaVisitor, StructType, Type, VariantType, }; use crate::{Error, ErrorKind}; @@ -692,6 +692,16 @@ impl SchemaVisitor for ToArrowSchemaConverter { } } } + + fn variant(&mut self, _v: &VariantType) -> crate::Result { + // Variant is stored as a struct with two required binary fields (no field IDs on sub-fields). + // Uses Binary (not LargeBinary) matching the Parquet BINARY primitive directly. + let metadata_field = Field::new("metadata", DataType::Binary, false); + let value_field = Field::new("value", DataType::Binary, false); + Ok(ArrowSchemaOrFieldOrType::Type(DataType::Struct( + vec![metadata_field, value_field].into(), + ))) + } } /// Convert iceberg schema to an arrow schema. @@ -1705,6 +1715,15 @@ mod tests { simple_field("map", map, false, "16"), simple_field("struct", r#struct, false, "17"), simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"), + simple_field( + "v", + DataType::Struct(Fields::from(vec![ + Field::new("metadata", DataType::Binary, false), + Field::new("value", DataType::Binary, false), + ])), + true, + "31", + ), ]) } @@ -1888,6 +1907,12 @@ mod tests { "name":"uuid", "required":true, "type":"uuid" + }, + { + "id":31, + "name":"v", + "required":false, + "type":"variant" } ], "identifier-field-ids":[] diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index d07233c420..c76233820f 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -30,7 +30,7 @@ use uuid::Uuid; use super::get_field_id_from_metadata; use crate::spec::{ ListType, Literal, Map, MapType, NestedField, PartnerAccessor, PrimitiveLiteral, PrimitiveType, - SchemaWithPartnerVisitor, Struct, StructType, Type, visit_struct_with_partner, + SchemaWithPartnerVisitor, Struct, StructType, Type, VariantType, visit_struct_with_partner, visit_type_with_partner, }; use crate::{Error, ErrorKind, Result}; @@ -426,6 +426,13 @@ impl SchemaWithPartnerVisitor for ArrowArrayToIcebergStructConverter { } } } + + fn variant(&mut self, _v: &VariantType, _partner: &ArrayRef) -> Result>> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Converting variant Arrow array to Iceberg literal is not supported yet", + )) + } } /// Defines how Arrow fields are matched with Iceberg fields when converting data. diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index fdbc680977..1002ecdf5d 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -28,7 +28,7 @@ use serde_json::{Number, Value}; use crate::spec::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, Schema, SchemaVisitor, - StructType, Type, visit_schema, + StructType, Type, VariantType, visit_schema, }; use crate::{Error, ErrorKind, Result, ensure_data_valid}; @@ -37,11 +37,15 @@ const FIELD_ID_PROP: &str = "field-id"; const KEY_ID: &str = "key-id"; const VALUE_ID: &str = "value-id"; const MAP_LOGICAL_TYPE: &str = "map"; +const VARIANT_LOGICAL_TYPE: &str = "variant"; // This const may better to maintain in avro-rs. const LOGICAL_TYPE: &str = "logicalType"; struct SchemaToAvroSchema { schema: String, + // Stack of enclosing field ids, used to derive unique record names for + // structural types (e.g. variant) — mirrors Java TypeToSchema.fieldIds. + field_ids: Vec, } type AvroSchemaOrField = Either; @@ -49,6 +53,39 @@ type AvroSchemaOrField = Either; impl SchemaVisitor for SchemaToAvroSchema { type T = AvroSchemaOrField; + fn before_struct_field(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_struct_field(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn before_list_element(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_list_element(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn before_map_key(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_map_key(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn before_map_value(&mut self, field: &NestedFieldRef) -> Result<()> { + self.field_ids.push(field.id); + Ok(()) + } + fn after_map_value(&mut self, _field: &NestedFieldRef) -> Result<()> { + self.field_ids.pop(); + Ok(()) + } + fn schema(&mut self, _schema: &Schema, value: AvroSchemaOrField) -> Result { let mut avro_schema = value.unwrap_left(); @@ -220,6 +257,47 @@ impl SchemaVisitor for SchemaToAvroSchema { } } + fn variant(&mut self, _v: &VariantType) -> Result { + let fields = vec![ + AvroRecordField { + name: "metadata".to_string(), + schema: AvroSchema::Bytes, + order: RecordFieldOrder::Ignore, + position: 0, + doc: None, + aliases: None, + default: None, + custom_attributes: Default::default(), + }, + AvroRecordField { + name: "value".to_string(), + schema: AvroSchema::Bytes, + order: RecordFieldOrder::Ignore, + position: 1, + doc: None, + aliases: None, + default: None, + custom_attributes: Default::default(), + }, + ]; + // Avro record names must be unique within a schema. Derive the name from the + // enclosing field id. + let record_name = match self.field_ids.last() { + Some(id) => format!("r{id}"), + // falling back to "variant" when no enclosing id is set. + None => VARIANT_LOGICAL_TYPE.to_string(), + }; + let mut schema = avro_record_schema(&record_name, fields)?; + let AvroSchema::Record(record) = &mut schema else { + unreachable!("avro_record_schema must return AvroSchema::Record"); + }; + record.attributes.insert( + LOGICAL_TYPE.to_string(), + Value::String(VARIANT_LOGICAL_TYPE.to_string()), + ); + Ok(Either::Left(schema)) + } + fn primitive(&mut self, p: &PrimitiveType) -> Result { let avro_schema = match p { PrimitiveType::Boolean => AvroSchema::Boolean, @@ -249,6 +327,7 @@ impl SchemaVisitor for SchemaToAvroSchema { pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Result { let mut converter = SchemaToAvroSchema { schema: name.to_string(), + field_ids: Vec::new(), }; visit_schema(schema, &mut converter).map(Either::unwrap_left) @@ -435,6 +514,13 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { record: &RecordSchema, field_types: Vec>, ) -> Result> { + // A variant is encoded as a record with logicalType "variant" — return it directly + // rather than trying to build a struct from its metadata/value byte fields. + if record.attributes.get(LOGICAL_TYPE).and_then(Value::as_str) == Some(VARIANT_LOGICAL_TYPE) + { + return Ok(Some(Type::Variant(VariantType))); + } + let mut fields = Vec::with_capacity(field_types.len()); for (avro_field, field_type) in record.fields.iter().zip_eq(field_types) { let field_id = @@ -614,7 +700,9 @@ mod tests { use super::*; use crate::avro::schema::AvroSchemaToSchema; - use crate::spec::{ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type}; + use crate::spec::{ + ListType, MapType, NestedField, PrimitiveType, Schema, StructType, Type, VariantType, + }; fn read_test_data_file_to_avro_schema(filename: &str) -> AvroSchema { let input = read_to_string(format!( @@ -1212,4 +1300,163 @@ mod tests { converter.primitive(&avro_schema).unwrap().unwrap() ); } + + /// Adapted from Java TestSchemaConversions.testVariantConversion + #[test] + fn test_variant_schema_conversion() { + let avro_schema = AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "variantCol1", + "type": { + "type": "record", + "name": "r1", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + }, + "field-id": 1 + }, + { + "name": "variantCol2", + "type": { + "type": "record", + "name": "r2", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + }, + "field-id": 2 + } + ] +} + "#, + ) + .unwrap(); + + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "variantCol1", Type::Variant(VariantType)).into(), + NestedField::required(2, "variantCol2", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(); + + check_schema_conversion(avro_schema, iceberg_schema); + } + + #[test] + fn test_optional_variant_schema_conversion() { + let avro_schema = AvroSchema::parse_str( + r#" +{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "v", + "type": [ + "null", + { + "type": "record", + "name": "r1", + "logicalType": "variant", + "fields": [ + {"name": "metadata", "type": "bytes"}, + {"name": "value", "type": "bytes"} + ] + } + ], + "default": null, + "field-id": 1 + } + ] +} + "#, + ) + .unwrap(); + + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::optional(1, "v", Type::Variant(VariantType)).into(), + ]) + .build() + .unwrap(); + + check_schema_conversion(avro_schema, iceberg_schema); + } + + /// Regression: two variant columns nested inside a struct must produce unique + /// Avro record names (`r{field_id}`). + #[test] + fn test_multiple_variants_in_struct_have_unique_record_names() { + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::required( + 1, + "nested", + Type::Struct(StructType::new(vec![ + NestedField::required(2, "v1", Type::Variant(VariantType)).into(), + NestedField::required(3, "v2", Type::Variant(VariantType)).into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + let avro_schema = schema_to_avro_schema("test", &iceberg_schema).unwrap(); + + // Collect every variant record's name from the resulting Avro schema + // and assert they are unique and match the enclosing field ids. + let json = serde_json::to_value(&avro_schema).unwrap(); + fn walk(v: &Value, out: &mut Vec) { + match v { + Value::Object(map) => { + if map.get("logicalType").and_then(Value::as_str) == Some("variant") + && let Some(name) = map.get("name").and_then(Value::as_str) + { + out.push(name.to_string()); + } + for (_k, child) in map { + walk(child, out); + } + } + Value::Array(arr) => { + for child in arr { + walk(child, out); + } + } + _ => {} + } + } + let mut variant_record_names: Vec = Vec::new(); + walk(&json, &mut variant_record_names); + + assert_eq!( + variant_record_names.len(), + 2, + "expected two variant records, got {variant_record_names:?}" + ); + assert!( + variant_record_names.contains(&"r2".to_string()), + "expected variant record 'r2' (field id 2), got {variant_record_names:?}" + ); + assert!( + variant_record_names.contains(&"r3".to_string()), + "expected variant record 'r3' (field id 3), got {variant_record_names:?}" + ); + + // Round-trips back to the same Iceberg schema. + let roundtrip = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(iceberg_schema, roundtrip); + } } diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index ad4aea758f..21f7b4709a 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -32,8 +32,8 @@ use serde_json::Value as JsonValue; use super::values::Literal; use crate::ensure_data_valid; use crate::error::Result; -use crate::spec::PrimitiveLiteral; use crate::spec::datatypes::_decimal::{MAX_PRECISION, REQUIRED_LENGTH}; +use crate::spec::{FormatVersion, PrimitiveLiteral}; /// Field name for list type. pub const LIST_FIELD_NAME: &str = "element"; @@ -90,6 +90,8 @@ pub enum Type { List(ListType), /// Map type Map(MapType), + /// Variant Type + Variant(VariantType), } impl fmt::Display for Type { @@ -99,6 +101,7 @@ impl fmt::Display for Type { Type::Struct(s) => write!(f, "{s}"), Type::List(_) => write!(f, "list"), Type::Map(_) => write!(f, "map"), + Type::Variant(_) => write!(f, "variant"), } } } @@ -122,6 +125,12 @@ impl Type { matches!(self, Type::Struct(_) | Type::List(_) | Type::Map(_)) } + /// Whether the type is variant type. + #[inline(always)] + pub fn is_variant(&self) -> bool { + matches!(self, Type::Variant(_)) + } + /// Convert Type to reference of PrimitiveType pub fn as_primitive_type(&self) -> Option<&PrimitiveType> { if let Type::Primitive(primitive_type) = self { @@ -178,6 +187,31 @@ impl Type { Type::Primitive(PrimitiveType::Float) | Type::Primitive(PrimitiveType::Double) ) } + + /// Returns the minimum format version required for the type. + /// + /// Recurses into nested types, so a struct/list/map returns the highest + /// minimum version among its fields + pub fn min_format_version(&self) -> FormatVersion { + match self { + Type::Primitive(PrimitiveType::TimestampNs) + | Type::Primitive(PrimitiveType::TimestamptzNs) + | Type::Variant(_) => FormatVersion::V3, + Type::Primitive(_) => FormatVersion::V1, + Type::Struct(s) => s + .fields() + .iter() + .map(|f| f.field_type.min_format_version()) + .max() + .unwrap_or(FormatVersion::V1), + Type::List(l) => l.element_field.field_type.min_format_version(), + Type::Map(m) => m + .key_field + .field_type + .min_format_version() + .max(m.value_field.field_type.min_format_version()), + } + } } impl From for Type { @@ -710,6 +744,7 @@ pub(super) mod _serde { use crate::spec::datatypes::Type::Map; use crate::spec::datatypes::{ ListType, MapType, NestedField, NestedFieldRef, PrimitiveType, StructType, Type, + VariantType, }; /// List type for serialization and deserialization @@ -737,6 +772,7 @@ pub(super) mod _serde { value: Cow<'a, Type>, }, Primitive(PrimitiveType), + Variant(VariantType), } impl From> for Type { @@ -775,6 +811,7 @@ pub(super) mod _serde { Self::Struct(StructType::new(fields.into_owned())) } SerdeType::Primitive(p) => Self::Primitive(p), + SerdeType::Variant(v) => Self::Variant(v), } } } @@ -801,6 +838,7 @@ pub(super) mod _serde { fields: Cow::Borrowed(&s.fields), }, Type::Primitive(p) => SerdeType::Primitive(p.clone()), + Type::Variant(v) => SerdeType::Variant(*v), } } } @@ -844,6 +882,42 @@ impl MapType { } } +/// Variant type - can hold semi-structured data of any type. +/// This is an Iceberg V3 feature. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Hash)] +pub struct VariantType; + +impl fmt::Display for VariantType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "variant") + } +} + +impl From for Type { + fn from(_: VariantType) -> Self { + Type::Variant(VariantType) + } +} + +impl Serialize for VariantType { + fn serialize(&self, serializer: S) -> std::result::Result + where S: Serializer { + serializer.serialize_str("variant") + } +} + +impl<'de> Deserialize<'de> for VariantType { + fn deserialize(deserializer: D) -> std::result::Result + where D: Deserializer<'de> { + let s = String::deserialize(deserializer)?; + if s == "variant" { + Ok(VariantType) + } else { + Err(D::Error::custom(format!("expected 'variant', got '{s}'"))) + } + } +} + #[cfg(test)] mod tests { use pretty_assertions::assert_eq; @@ -1276,6 +1350,47 @@ mod tests { } } + #[test] + fn variant_type_serde() { + let json = r#"{"id": 1, "name": "v", "required": true, "type": "variant"}"#; + let field: NestedField = serde_json::from_str(json).unwrap(); + assert_eq!(*field.field_type, Type::Variant(VariantType)); + + let serialized = serde_json::to_string(&field).unwrap(); + let roundtrip: NestedField = serde_json::from_str(&serialized).unwrap(); + assert_eq!(field, roundtrip); + } + + #[test] + fn min_format_version_recurses_into_nested_types() { + // Plain primitives stay at V1. + assert_eq!( + Type::Primitive(PrimitiveType::String).min_format_version(), + FormatVersion::V1 + ); + + // A struct containing a Variant reports V3 (recursion, not just the + // top-level kind). + let struct_with_variant = Type::Struct(StructType::new(vec![ + NestedField::required(1, "a", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "v", Type::Variant(VariantType)).into(), + ])); + assert_eq!(struct_with_variant.min_format_version(), FormatVersion::V3); + + // A list whose element is a v3-only type reports V3. + let list_of_ts_ns = Type::List(ListType::new( + NestedField::required(3, "element", Type::Primitive(PrimitiveType::TimestampNs)).into(), + )); + assert_eq!(list_of_ts_ns.min_format_version(), FormatVersion::V3); + + // A map with all-V1 fields stays V1. + let v1_map = Type::Map(MapType::new( + NestedField::required(4, "key", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(5, "value", Type::Primitive(PrimitiveType::Long)).into(), + )); + assert_eq!(v1_map.min_format_version(), FormatVersion::V1); + } + #[test] fn struct_type_with_type_field() { // Test that StructType properly deserializes JSON with "type":"struct" field diff --git a/crates/iceberg/src/spec/schema/id_reassigner.rs b/crates/iceberg/src/spec/schema/id_reassigner.rs index 5dbb370001..72caf30121 100644 --- a/crates/iceberg/src/spec/schema/id_reassigner.rs +++ b/crates/iceberg/src/spec/schema/id_reassigner.rs @@ -102,6 +102,7 @@ impl ReassignFieldIds { value_field: Arc::new(value_field), })) } + Type::Variant(v) => Ok(Type::Variant(v)), } } diff --git a/crates/iceberg/src/spec/schema/index.rs b/crates/iceberg/src/spec/schema/index.rs index d4e77ab2aa..0633ca0aa9 100644 --- a/crates/iceberg/src/spec/schema/index.rs +++ b/crates/iceberg/src/spec/schema/index.rs @@ -17,6 +17,7 @@ use super::utils::try_insert_field; use super::*; +use crate::spec::VariantType; /// Creates a field id to field map. pub fn index_by_id(r#struct: &StructType) -> Result> { @@ -53,6 +54,10 @@ pub fn index_by_id(r#struct: &StructType) -> Result fn primitive(&mut self, _: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &VariantType) -> Result { + Ok(()) + } } let mut index = IndexById(HashMap::new()); @@ -145,6 +150,10 @@ pub fn index_parents(r#struct: &StructType) -> Result> { fn primitive(&mut self, _p: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &VariantType) -> Result { + Ok(()) + } } let mut index = IndexByParent { @@ -293,6 +302,10 @@ impl SchemaVisitor for IndexByName { fn primitive(&mut self, _p: &PrimitiveType) -> Result { Ok(()) } + + fn variant(&mut self, _v: &VariantType) -> Result { + Ok(()) + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/schema/mod.rs b/crates/iceberg/src/spec/schema/mod.rs index 9109990e19..7dd0d7cd46 100644 --- a/crates/iceberg/src/spec/schema/mod.rs +++ b/crates/iceberg/src/spec/schema/mod.rs @@ -39,6 +39,7 @@ pub use self::prune_columns::prune_columns; use super::NestedField; use crate::error::Result; use crate::expr::accessor::StructAccessor; +use crate::spec::FormatVersion; use crate::spec::datatypes::{ LIST_FIELD_NAME, ListType, MAP_KEY_FIELD_NAME, MAP_VALUE_FIELD_NAME, MapType, NestedFieldRef, PrimitiveType, StructType, Type, @@ -53,6 +54,9 @@ pub type SchemaRef = Arc; pub const DEFAULT_SCHEMA_ID: SchemaId = 0; /// Delimiter for schema name, which denotes a nested struct. pub const SCHEMA_NAME_DELIMITER: &str = "."; +/// Minimum format version that allows non-null field default values. +/// Mirrors Java's `Schema.DEFAULT_VALUES_MIN_FORMAT_VERSION`. +pub const MIN_FORMAT_VERSION_DEFAULT_VALUES: FormatVersion = FormatVersion::V3; /// Defines schema in iceberg. #[derive(Debug, Serialize, Deserialize, Clone)] @@ -421,6 +425,67 @@ impl Schema { pub fn field_id_to_fields(&self) -> &HashMap { &self.id_to_field } + + /// Returns the minimum [`FormatVersion`] required to represent all types in this schema. + /// + /// Defaults to `FormatVersion::V1` if all types are universally supported. + pub fn min_format_version(&self) -> FormatVersion { + self.as_struct() + .fields() + .iter() + .map(|f| f.field_type.min_format_version()) + .max() + .unwrap_or(FormatVersion::V1) + } + + /// Check that all types in this schema are supported by the given format version. + /// + /// Mirrors Java's `Schema.checkCompatibility()`; returns an error listing every + /// incompatible field. Two checks per field: + /// + /// - **Type**: `TimestampNs` / `TimestamptzNs` / `Variant` require v3+. + /// - **Initial default**: a non-null `initial_default` requires + /// [`MIN_FORMAT_VERSION_DEFAULT_VALUES`] — it backfills pre-existing rows, + /// which older readers can't honor. `write_default` only affects newly + /// written rows (physically materialized, read the same at any version), so + /// it is not checked. + pub fn check_format_compatibility(&self, format_version: FormatVersion) -> Result<()> { + let mut problems: Vec = Vec::new(); + + for field in self.id_to_field.values() { + let name = self + .name_by_field_id(field.id) + .unwrap_or(field.name.as_str()); + + let min_version = field.field_type.min_format_version(); + if format_version < min_version { + problems.push(format!( + "Invalid type for {name}: {} is not supported until {min_version} but format version is {format_version}.", + field.field_type, + )); + } + + if let Some(default) = &field.initial_default + && format_version < MIN_FORMAT_VERSION_DEFAULT_VALUES + { + problems.push(format!( + "Invalid initial default for {name}: non-null default ({default:?}) is not supported until {MIN_FORMAT_VERSION_DEFAULT_VALUES} but format version is {format_version}." + )); + } + } + + if problems.is_empty() { + Ok(()) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid schema for {format_version}:\n- {}", + problems.join("\n- ") + ), + )) + } + } } impl Display for Schema { @@ -447,6 +512,77 @@ mod tests { use crate::spec::values::Map as MapValue; use crate::spec::{Datum, Literal}; + #[test] + fn test_check_format_compatibility() { + use crate::spec::{FormatVersion, PrimitiveLiteral, VariantType}; + + fn schema_with(fields: Vec) -> Schema { + Schema::builder().with_fields(fields).build().unwrap() + } + + // Variant type requires v3. + let variant = schema_with(vec![ + NestedField::optional(1, "v", Type::Variant(VariantType)).into(), + ]); + assert!( + variant + .check_format_compatibility(FormatVersion::V2) + .is_err() + ); + assert!( + variant + .check_format_compatibility(FormatVersion::V3) + .is_ok() + ); + + // A non-null initial default requires v3, even for a v1-compatible type. + let with_default = schema_with(vec![ + NestedField::optional(1, "a", Type::Primitive(PrimitiveType::Int)) + .with_initial_default(Literal::Primitive(PrimitiveLiteral::Int(1))) + .into(), + ]); + let err = with_default + .check_format_compatibility(FormatVersion::V2) + .unwrap_err(); + assert!( + err.message().contains("Invalid initial default for a"), + "{err}" + ); + assert!( + with_default + .check_format_compatibility(FormatVersion::V3) + .is_ok() + ); + + // No default is fine at any version (an absent/null default never trips). + let no_default = schema_with(vec![ + NestedField::optional(1, "a", Type::Primitive(PrimitiveType::Int)).into(), + ]); + assert!( + no_default + .check_format_compatibility(FormatVersion::V1) + .is_ok() + ); + + // Recursion: a default on a field nested inside a struct is caught. + let nested = schema_with(vec![ + NestedField::required( + 1, + "s", + Type::Struct(StructType::new(vec![ + NestedField::optional(2, "inner", Type::Primitive(PrimitiveType::Long)) + .with_initial_default(Literal::Primitive(PrimitiveLiteral::Long(7))) + .into(), + ])), + ) + .into(), + ]); + let err = nested + .check_format_compatibility(FormatVersion::V2) + .unwrap_err(); + assert!(err.message().contains("inner"), "{err}"); + } + #[test] fn test_construct_schema() { let field1: NestedFieldRef = diff --git a/crates/iceberg/src/spec/schema/prune_columns.rs b/crates/iceberg/src/spec/schema/prune_columns.rs index 14f1bfd25f..dfbde57656 100644 --- a/crates/iceberg/src/spec/schema/prune_columns.rs +++ b/crates/iceberg/src/spec/schema/prune_columns.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use crate::spec::VariantType; struct PruneColumn { selected: HashSet, @@ -238,6 +239,10 @@ impl SchemaVisitor for PruneColumn { fn primitive(&mut self, _p: &PrimitiveType) -> Result> { Ok(None) } + + fn variant(&mut self, _v: &VariantType) -> Result { + Ok(None) + } } #[cfg(test)] diff --git a/crates/iceberg/src/spec/schema/visitor.rs b/crates/iceberg/src/spec/schema/visitor.rs index 50f7c04caa..3287ad7ee8 100644 --- a/crates/iceberg/src/spec/schema/visitor.rs +++ b/crates/iceberg/src/spec/schema/visitor.rs @@ -16,6 +16,7 @@ // under the License. use super::*; +use crate::spec::VariantType; /// A post order schema visitor. /// @@ -69,6 +70,9 @@ pub trait SchemaVisitor { fn map(&mut self, map: &MapType, key_value: Self::T, value: Self::T) -> Result; /// Called when see a primitive type. fn primitive(&mut self, p: &PrimitiveType) -> Result; + + /// Called when see a variant type. + fn variant(&mut self, _v: &VariantType) -> Result; } /// Visiting a type in post order. @@ -99,6 +103,7 @@ pub(crate) fn visit_type(r#type: &Type, visitor: &mut V) -> Re visitor.map(map, key_result, value_result) } Type::Struct(s) => visit_struct(s, visitor), + Type::Variant(v) => visitor.variant(v), } } @@ -185,6 +190,8 @@ pub trait SchemaWithPartnerVisitor

{ ) -> Result; /// Called when see a primitive type. fn primitive(&mut self, p: &PrimitiveType, partner: &P) -> Result; + /// Called when see a variant type. + fn variant(&mut self, _v: &VariantType, _partner: &P) -> Result; } /// Accessor used to get child partner from parent partner. @@ -242,6 +249,7 @@ pub(crate) fn visit_type_with_partner, A: Part visitor.map(map, partner, key_result, value_result) } Type::Struct(s) => visit_struct_with_partner(s, partner, visitor, accessor), + Type::Variant(v) => visitor.variant(v, partner), } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 607fd98350..597ace956f 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -535,6 +535,7 @@ impl TableMetadata { // Normalize location (remove trailing slash) self.location = self.location.trim_end_matches('/').to_string(); self.validate_snapshot_sequence_number()?; + self.validate_schema_format_compatibility()?; self.try_normalize_partition_spec()?; self.try_normalize_sort_order()?; Ok(self) @@ -749,6 +750,13 @@ impl TableMetadata { Ok(()) } + + /// Validates that every type used in the current schema is supported by the + /// table's format version. Delegates to [`Schema::check_format_compatibility`]. + fn validate_schema_format_compatibility(&self) -> Result<()> { + self.current_schema() + .check_format_compatibility(self.format_version) + } } pub(super) mod _serde { diff --git a/crates/iceberg/src/spec/values/literal.rs b/crates/iceberg/src/spec/values/literal.rs index 69e5cc02a4..4ae3713d21 100644 --- a/crates/iceberg/src/spec/values/literal.rs +++ b/crates/iceberg/src/spec/values/literal.rs @@ -601,6 +601,10 @@ impl Literal { )) } } + Type::Variant(_) => Err(Error::new( + ErrorKind::DataInvalid, + "Variant type is not supported for single-value JSON serialization", + )), } } diff --git a/crates/iceberg/src/transaction/update_schema.rs b/crates/iceberg/src/transaction/update_schema.rs index da843d9b9b..276c2df3a9 100644 --- a/crates/iceberg/src/transaction/update_schema.rs +++ b/crates/iceberg/src/transaction/update_schema.rs @@ -181,6 +181,9 @@ fn assign_fresh_ids(field: &NestedField, next_id: &mut i32) -> NestedFieldRef { fn assign_fresh_ids_to_type(field_type: &Type, next_id: &mut i32) -> Type { match field_type { Type::Primitive(_) => field_type.clone(), + // Variant carries no nested fields, so there is nothing to reassign + // (matches id_reassigner.rs). + Type::Variant(v) => Type::Variant(*v), Type::Struct(struct_type) => { let new_fields: Vec = struct_type .fields() @@ -279,7 +282,7 @@ fn rebuild_field( delete_ids: &HashSet, ) -> NestedFieldRef { match field.field_type.as_ref() { - Type::Primitive(_) => field.clone(), + Type::Primitive(_) | Type::Variant(_) => field.clone(), Type::Struct(s) => { let new_fields = rebuild_fields(s.fields(), adds, delete_ids, Some(field.id)); Arc::new(NestedField { diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..5cd9ebd138 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -40,7 +40,7 @@ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + StructType, TableMetadata, Type, VariantType, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -113,6 +113,22 @@ impl IndexByParquetPathName { pub fn get(&self, name: &str) -> Option<&i32> { self.name_to_id.get(name) } + + fn insert_current_path(&mut self) -> Result<()> { + let full_name = self.field_names.iter().map(String::as_str).join("."); + let field_id = self.field_id; + if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}" + ), + )); + } else { + self.name_to_id.insert(full_name, field_id); + } + Ok(()) + } } impl Default for IndexByParquetPathName { @@ -191,20 +207,11 @@ impl SchemaVisitor for IndexByParquetPathName { } fn primitive(&mut self, _p: &PrimitiveType) -> Result { - let full_name = self.field_names.iter().map(String::as_str).join("."); - let field_id = self.field_id; - if let Some(existing_field_id) = self.name_to_id.get(full_name.as_str()) { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Invalid schema: multiple fields for name {full_name}: {field_id} and {existing_field_id}" - ), - )); - } else { - self.name_to_id.insert(full_name, field_id); - } + self.insert_current_path() + } - Ok(()) + fn variant(&mut self, _v: &VariantType) -> Result { + self.insert_current_path() } } diff --git a/crates/integration_tests/tests/read_variant.rs b/crates/integration_tests/tests/read_variant.rs new file mode 100644 index 0000000000..9d8801fb11 --- /dev/null +++ b/crates/integration_tests/tests/read_variant.rs @@ -0,0 +1,403 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Integration tests for variant type support. +//! +//! These tests require a running Docker environment seeded by `dev/spark/provision.py`. +//! The Spark 4.0 provisioner creates `rest.default.test_variant_column` with a +//! `VARIANT` column containing three rows of JSON data. + +use std::sync::Arc; + +use arrow_array::StructArray; +use arrow_array::cast::AsArray; +use arrow_schema::DataType; +use futures::TryStreamExt; +use iceberg::spec::Type; +use iceberg::{Catalog, CatalogBuilder, TableIdent}; +use iceberg_catalog_rest::{RestCatalog, RestCatalogBuilder}; +use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; + +/// Build a `RestCatalog` against the docker-compose test fixture, wired with +/// the S3 storage factory the seeded tables expect. +async fn rest_catalog() -> RestCatalog { + let fixture = get_test_fixture(); + RestCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load: None, + })) + .load("rest", fixture.catalog_config.clone()) + .await + .unwrap() +} + +/// Asserts that `dt` is `Struct(metadata: Binary, value: Binary)` — the Parquet +/// physical layout of a variant column. +fn assert_variant_struct(dt: &DataType) { + let DataType::Struct(fields) = dt else { + panic!("expected variant to be DataType::Struct, got {dt:?}"); + }; + assert_eq!( + fields.len(), + 2, + "variant struct must have exactly 2 sub-fields" + ); + assert!( + fields + .iter() + .any(|f| f.name() == "metadata" && f.data_type() == &DataType::Binary), + "variant struct missing 'metadata: Binary' sub-field: {fields:?}" + ); + assert!( + fields + .iter() + .any(|f| f.name() == "value" && f.data_type() == &DataType::Binary), + "variant struct missing 'value: Binary' sub-field: {fields:?}" + ); +} + +/// Verifies that a table written by Spark with a VARIANT column has its schema +/// parsed into `Type::Variant` by the Rust iceberg implementation. +#[tokio::test] +async fn test_variant_schema_is_parsed() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let schema = table.metadata().current_schema(); + let variant_field = schema + .field_by_name("v") + .expect("field 'v' not found in schema"); + + assert!( + matches!(variant_field.field_type.as_ref(), Type::Variant(_)), + "Expected Type::Variant for field 'v', got {:?}", + variant_field.field_type, + ); +} + +/// Verifies that scanning a table with a VARIANT column produces an Arrow batch +/// where the variant column is represented as `Struct(metadata: Binary, value: Binary)`, +/// matching the Parquet physical layout (§3.3 of the Parquet Variant spec). +#[tokio::test] +async fn test_variant_arrow_schema() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let scan = table.scan().build().unwrap(); + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + + // Variant column must be a struct with exactly two binary sub-fields + let v_col = batches[0] + .column_by_name("v") + .expect("column 'v' not found in batch"); + + let DataType::Struct(fields) = v_col.data_type() else { + panic!( + "Expected variant column to be DataType::Struct, got {:?}", + v_col.data_type() + ); + }; + + assert_eq!( + fields.len(), + 2, + "variant struct must have exactly 2 sub-fields" + ); + + let metadata_field = fields + .iter() + .find(|f| f.name() == "metadata") + .expect("sub-field 'metadata' not found"); + let value_field = fields + .iter() + .find(|f| f.name() == "value") + .expect("sub-field 'value' not found"); + + assert_eq!( + metadata_field.data_type(), + &DataType::Binary, + "'metadata' sub-field must be DataType::Binary" + ); + assert_eq!( + value_field.data_type(), + &DataType::Binary, + "'value' sub-field must be DataType::Binary" + ); +} + +/// Verifies that a variant column is NOT silently dropped when it is projected +/// alongside ordinary primitive columns (regression test for the projection bug +/// where variant sub-fields had no embedded Parquet field IDs and were therefore +/// excluded from `column_map`, causing the whole variant group to be omitted). +#[tokio::test] +async fn test_variant_projected_with_primitive_columns() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + // Explicitly select only the two columns — this exercises the projection path + // that was previously broken for variant types. + let scan = table.scan().select(["id", "v"]).build().unwrap(); + let batch_stream = scan.to_arrow().await.unwrap(); + let batches: Vec<_> = batch_stream.try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + + let first_batch = &batches[0]; + + // Both columns must be present — the variant must not be silently dropped. + assert!( + first_batch.column_by_name("id").is_some(), + "column 'id' not found in projected batch" + ); + let v_col = first_batch + .column_by_name("v") + .expect("column 'v' was silently dropped from projected scan — projection bug regression"); + + // The variant column must still be a struct with the expected sub-fields. + let DataType::Struct(fields) = v_col.data_type() else { + panic!( + "Expected variant column to be DataType::Struct after projection, got {:?}", + v_col.data_type() + ); + }; + assert_eq!( + fields.len(), + 2, + "projected variant struct must have exactly 2 sub-fields" + ); + assert!( + fields.iter().any(|f| f.name() == "metadata"), + "projected variant struct must have a 'metadata' sub-field" + ); + assert!( + fields.iter().any(|f| f.name() == "value"), + "projected variant struct must have a 'value' sub-field" + ); + + // All three seeded rows must be readable. + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3, "expected exactly 3 rows"); +} + +/// Verifies that projecting ONLY a variant column (without any sibling +/// primitive) still includes both of its sub-leaves in the projection mask. +/// This exercises a different branch of the mask builder than +/// [`test_variant_projected_with_primitive_columns`], where `column_map` +/// contains only variant-derived entries. +#[tokio::test] +async fn test_variant_projected_alone() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_column"]).unwrap()) + .await + .unwrap(); + + let scan = table.scan().select(["v"]).build().unwrap(); + let batches: Vec<_> = scan.to_arrow().await.unwrap().try_collect().await.unwrap(); + + assert!(!batches.is_empty(), "expected at least one record batch"); + let v_col = batches[0] + .column_by_name("v") + .expect("variant-only projection dropped column 'v'"); + assert_variant_struct(v_col.data_type()); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Verifies that a table with multiple top-level variant columns is readable +/// end-to-end — guards against the Avro record-name collision +#[tokio::test] +async fn test_multiple_variant_columns() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_multi"]).unwrap()) + .await + .unwrap(); + + // Both variant fields must parse as Type::Variant in the Iceberg schema. + let schema = table.metadata().current_schema(); + for name in ["v1", "v2"] { + let f = schema + .field_by_name(name) + .unwrap_or_else(|| panic!("field '{name}' missing from schema")); + assert!( + matches!(f.field_type.as_ref(), Type::Variant(_)), + "field '{name}' is not Type::Variant: {:?}", + f.field_type + ); + } + + // Full scan returns both variant columns with the correct physical shape. + let batches: Vec<_> = table + .scan() + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + assert_variant_struct( + batches[0] + .column_by_name("v1") + .expect("missing v1") + .data_type(), + ); + assert_variant_struct( + batches[0] + .column_by_name("v2") + .expect("missing v2") + .data_type(), + ); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Full scan of a table where a variant lives inside a struct. Confirms that +/// the nested `payload: VARIANT` sub-field is materialized as +/// `Struct(metadata, value)` in the output. +#[tokio::test] +async fn test_nested_variant_full_scan() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_nested"]).unwrap()) + .await + .unwrap(); + + let batches: Vec<_> = table + .scan() + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + + let nested = batches[0] + .column_by_name("nested") + .expect("column 'nested' missing"); + let nested_struct: &StructArray = nested.as_struct(); + let payload = nested_struct + .column_by_name("payload") + .expect("inner 'payload' field missing from nested struct"); + assert_variant_struct(payload.data_type()); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Regression: projecting a struct column that contains a variant must keep +/// the inner variant's sub-leaves. +#[tokio::test] +async fn test_nested_variant_projected() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_nested"]).unwrap()) + .await + .unwrap(); + + let batches: Vec<_> = table + .scan() + .select(["nested"]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + let nested = batches[0] + .column_by_name("nested") + .expect("projected scan dropped column 'nested'"); + let nested_struct: &StructArray = nested.as_struct(); + let payload = nested_struct.column_by_name("payload").expect( + "nested.payload (variant) was dropped from the projected scan — \ + nested-variant projection bug regression", + ); + assert_variant_struct(payload.data_type()); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} + +/// Projecting only a sibling primitive must NOT pull in the variant's leaves +/// from a neighbouring struct field. Guards against over-eager variant +/// inclusion in the projection mask. +#[tokio::test] +async fn test_nested_variant_sibling_projection() { + let rest_catalog = rest_catalog().await; + + let table = rest_catalog + .load_table(&TableIdent::from_strs(["default", "test_variant_nested"]).unwrap()) + .await + .unwrap(); + + let batches: Vec<_> = table + .scan() + .select(["id"]) + .build() + .unwrap() + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert!(!batches.is_empty()); + let first = &batches[0]; + assert!(first.column_by_name("id").is_some(), "column 'id' missing"); + assert!( + first.column_by_name("nested").is_none(), + "projecting only 'id' must not pull in sibling 'nested' (variant leaked)" + ); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 3); +} diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..af8932d734 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,6 +29,7 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; +use iceberg::spec::FormatVersion; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; @@ -163,10 +164,14 @@ impl SchemaProvider for IcebergSchemaProvider { let iceberg_schema = arrow_schema_to_schema_auto_assign_ids(df_schema.as_ref()) .map_err(to_datafusion_error)?; + // Use at least V2, and upgrade to V3 if the schema requires it (e.g. timestamp_ns / variant). + let format_version = iceberg_schema.min_format_version().max(FormatVersion::V2); + // Create the table in the Iceberg catalog let table_creation = TableCreation::builder() .name(name.clone()) .schema(iceberg_schema) + .format_version(format_version) .build(); let catalog = self.catalog.clone(); diff --git a/dev/spark/provision.py b/dev/spark/provision.py index 40f9ba0f38..cf1531b35f 100644 --- a/dev/spark/provision.py +++ b/dev/spark/provision.py @@ -129,6 +129,64 @@ spark.sql("ALTER TABLE rest.default.test_promote_partition_column ALTER COLUMN baz TYPE decimal(6, 2)") spark.sql("INSERT INTO rest.default.test_promote_partition_column VALUES (25, 22.25, 22.25)") +# Create a table with a variant column +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_column ( + id INT, + v VARIANT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_column +VALUES + (1, PARSE_JSON('{"a": 1, "b": "hello"}')), + (2, PARSE_JSON('[1, 2, 3]')), + (3, PARSE_JSON('42')) +""") + +# Table with TWO top-level variant columns — exercises the Avro +# record-name-collision path and multi-variant projection. +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_multi ( + id INT, + v1 VARIANT, + v2 VARIANT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_multi +VALUES + (1, PARSE_JSON('{"a": 1}'), PARSE_JSON('"x"')), + (2, PARSE_JSON('[1, 2, 3]'), PARSE_JSON('true')), + (3, PARSE_JSON('42'), PARSE_JSON('null')) +""") + +# Table with a VARIANT nested inside a struct — exercises projection of a +# variant that is not a top-level Arrow field (regression for the bug where +# nested variant sub-leaves were silently dropped from the projection mask). +spark.sql(""" +CREATE OR REPLACE TABLE rest.default.test_variant_nested ( + id INT, + nested STRUCT +) +USING iceberg +TBLPROPERTIES ('format-version'='3') +""") + +spark.sql(""" +INSERT INTO rest.default.test_variant_nested +VALUES + (1, named_struct('name', 'a', 'payload', PARSE_JSON('{"k": 1}'))), + (2, named_struct('name', 'b', 'payload', PARSE_JSON('[1, 2]'))), + (3, named_struct('name', 'c', 'payload', PARSE_JSON('42'))) +""") + # Create a table with various types spark.sql(""" CREATE OR REPLACE TABLE rest.default.types_test USING ICEBERG AS