diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index c51f6a6a89..11a9ffd0bc 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -681,6 +681,7 @@ impl Catalog for GlueCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index d778a3d5fc..c76dab8ed3 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -628,6 +628,7 @@ impl Catalog for HmsCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..4e6413fb46 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -778,6 +778,7 @@ impl Catalog for SqlCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index af74bff6fe..2bd888faa2 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -7506,6 +7506,55 @@ impl bnum::cast::As for iceberg::spec::ManifestList pub fn iceberg::spec::ManifestList::as_(self) -> T where T: bnum::cast::CastFrom impl ppv_lite86::types::VZip for iceberg::spec::ManifestList where V: ppv_lite86::types::MultiLane pub fn iceberg::spec::ManifestList::vzip(self) -> V +pub struct iceberg::spec::ManifestListReader<'a> +impl<'a> iceberg::spec::ManifestListReader<'a> +pub async fn iceberg::spec::ManifestListReader<'a>::load(&self) -> iceberg::Result +impl<'a> core::marker::Freeze for iceberg::spec::ManifestListReader<'a> +impl<'a> core::marker::Send for iceberg::spec::ManifestListReader<'a> +impl<'a> core::marker::Sync for iceberg::spec::ManifestListReader<'a> +impl<'a> core::marker::Unpin for iceberg::spec::ManifestListReader<'a> +impl<'a> !core::panic::unwind_safe::RefUnwindSafe for iceberg::spec::ManifestListReader<'a> +impl<'a> !core::panic::unwind_safe::UnwindSafe for iceberg::spec::ManifestListReader<'a> +impl core::convert::Into for iceberg::spec::ManifestListReader<'a> where U: core::convert::From +pub fn iceberg::spec::ManifestListReader<'a>::into(self) -> U +impl core::convert::TryFrom for iceberg::spec::ManifestListReader<'a> where U: core::convert::Into +pub type iceberg::spec::ManifestListReader<'a>::Error = core::convert::Infallible +pub fn iceberg::spec::ManifestListReader<'a>::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for iceberg::spec::ManifestListReader<'a> where U: core::convert::TryFrom +pub type iceberg::spec::ManifestListReader<'a>::Error = >::Error +pub fn iceberg::spec::ManifestListReader<'a>::try_into(self) -> core::result::Result>::Error> +impl as_any::AsAny for iceberg::spec::ManifestListReader<'a> where T: core::any::Any +pub fn iceberg::spec::ManifestListReader<'a>::as_any(&self) -> &(dyn core::any::Any + 'static) +pub fn iceberg::spec::ManifestListReader<'a>::as_any_mut(&mut self) -> &mut (dyn core::any::Any + 'static) +pub fn iceberg::spec::ManifestListReader<'a>::type_name(&self) -> &'static str +impl as_any::Downcast for iceberg::spec::ManifestListReader<'a> where T: as_any::AsAny + ?core::marker::Sized +impl core::any::Any for iceberg::spec::ManifestListReader<'a> where T: 'static + ?core::marker::Sized +pub fn iceberg::spec::ManifestListReader<'a>::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for iceberg::spec::ManifestListReader<'a> where T: ?core::marker::Sized +pub fn iceberg::spec::ManifestListReader<'a>::borrow(&self) -> &T +impl core::borrow::BorrowMut for iceberg::spec::ManifestListReader<'a> where T: ?core::marker::Sized +pub fn iceberg::spec::ManifestListReader<'a>::borrow_mut(&mut self) -> &mut T +impl core::convert::From for iceberg::spec::ManifestListReader<'a> +pub fn iceberg::spec::ManifestListReader<'a>::from(t: T) -> T +impl crossbeam_epoch::atomic::Pointable for iceberg::spec::ManifestListReader<'a> +pub type iceberg::spec::ManifestListReader<'a>::Init = T +pub const iceberg::spec::ManifestListReader<'a>::ALIGN: usize +pub unsafe fn iceberg::spec::ManifestListReader<'a>::deref<'a>(ptr: usize) -> &'a T +pub unsafe fn iceberg::spec::ManifestListReader<'a>::deref_mut<'a>(ptr: usize) -> &'a mut T +pub unsafe fn iceberg::spec::ManifestListReader<'a>::drop(ptr: usize) +pub unsafe fn iceberg::spec::ManifestListReader<'a>::init(init: ::Init) -> usize +impl either::into_either::IntoEither for iceberg::spec::ManifestListReader<'a> +impl tower_http::follow_redirect::policy::PolicyExt for iceberg::spec::ManifestListReader<'a> where T: ?core::marker::Sized +pub fn iceberg::spec::ManifestListReader<'a>::and(self, other: P) -> tower_http::follow_redirect::policy::and::And where T: tower_http::follow_redirect::policy::Policy, P: tower_http::follow_redirect::policy::Policy +pub fn iceberg::spec::ManifestListReader<'a>::or(self, other: P) -> tower_http::follow_redirect::policy::or::Or where T: tower_http::follow_redirect::policy::Policy, P: tower_http::follow_redirect::policy::Policy +impl tracing::instrument::Instrument for iceberg::spec::ManifestListReader<'a> +impl tracing::instrument::WithSubscriber for iceberg::spec::ManifestListReader<'a> +impl typenum::type_operators::Same for iceberg::spec::ManifestListReader<'a> +pub type iceberg::spec::ManifestListReader<'a>::Output = T +impl bnum::cast::As for iceberg::spec::ManifestListReader<'a> +pub fn iceberg::spec::ManifestListReader<'a>::as_(self) -> T where T: bnum::cast::CastFrom +impl ppv_lite86::types::VZip for iceberg::spec::ManifestListReader<'a> where V: ppv_lite86::types::MultiLane +pub fn iceberg::spec::ManifestListReader<'a>::vzip(self) -> V pub struct iceberg::spec::ManifestListWriter impl iceberg::spec::ManifestListWriter pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: impl core::iter::traits::iterator::Iterator) -> iceberg::Result<()> @@ -11004,6 +11053,7 @@ pub fn iceberg::table::Table::current_schema_ref(&self) -> iceberg::spec::Schema pub fn iceberg::table::Table::file_io(&self) -> &iceberg::io::FileIO pub fn iceberg::table::Table::identifier(&self) -> &iceberg::TableIdent pub fn iceberg::table::Table::inspect(&self) -> iceberg::inspect::MetadataTable<'_> +pub fn iceberg::table::Table::manifest_list_reader<'a>(&'a self, snapshot: &'a iceberg::spec::Snapshot) -> iceberg::spec::ManifestListReader<'a> pub fn iceberg::table::Table::metadata(&self) -> &iceberg::spec::TableMetadata pub fn iceberg::table::Table::metadata_location(&self) -> core::option::Option<&str> pub fn iceberg::table::Table::metadata_location_result(&self) -> iceberg::Result<&str> diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 3ae01a23df..2f2e2a215e 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -347,6 +347,7 @@ impl Catalog for MemoryCatalog { table_info.file_io(), table_info.metadata(), table_info.metadata_location(), + table_info.encryption_manager(), ) .await } diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index d450f9df80..22f491a9dd 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -22,8 +22,9 @@ use std::collections::HashSet; use futures::{TryStreamExt, stream}; use crate::Result; +use crate::encryption::EncryptionManager; use crate::io::FileIO; -use crate::spec::TableMetadata; +use crate::spec::{ManifestListReader, TableMetadata}; const DELETE_CONCURRENCY: usize = 10; @@ -40,6 +41,7 @@ pub async fn drop_table_data( io: &FileIO, metadata: &TableMetadata, metadata_location: Option<&str>, + encryption_manager: Option<&EncryptionManager>, ) -> Result<()> { let mut manifest_lists_to_delete: HashSet = HashSet::new(); let mut manifests_to_delete: HashSet = HashSet::new(); @@ -47,7 +49,9 @@ pub async fn drop_table_data( // Load all manifest lists concurrently let results: Vec<_> = futures::future::try_join_all(metadata.snapshots().map(|snapshot| async { - let manifest_list = snapshot.load_manifest_list(io, metadata).await?; + let manifest_list = ManifestListReader::new(snapshot, io, metadata, encryption_manager) + .load() + .await?; Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list)) })) .await?; diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 4c30ca2ec5..a985786460 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -161,9 +161,7 @@ impl<'a> ManifestsTable<'a> { let mut partition_summaries = self.partition_summary_builder()?; if let Some(snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) - .await?; + let manifest_list = self.table.manifest_list_reader(snapshot).load().await?; for manifest in manifest_list.entries() { content.append_value(manifest.content as i32); path.append_value(manifest.manifest_path.clone()); diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 5de45e2acc..7b028b7470 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -18,9 +18,11 @@ use std::mem::size_of_val; use std::sync::Arc; +use crate::encryption::EncryptionManager; use crate::io::FileIO; use crate::spec::{ - FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, + FormatVersion, Manifest, ManifestFile, ManifestList, ManifestListReader, SchemaId, SnapshotRef, + TableMetadataRef, }; use crate::{Error, ErrorKind, Result}; @@ -44,20 +46,25 @@ pub struct ObjectCache { cache: moka::future::Cache, file_io: FileIO, cache_disabled: bool, + encryption_manager: Option>, } impl ObjectCache { /// Creates a new [`ObjectCache`] /// with the default cache size - pub(crate) fn new(file_io: FileIO) -> Self { - Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES) + pub(crate) fn new(file_io: FileIO, encryption_manager: Option>) -> Self { + Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES, encryption_manager) } /// Creates a new [`ObjectCache`] /// with a specific cache size - pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self { + pub(crate) fn new_with_capacity( + file_io: FileIO, + cache_size_bytes: u64, + encryption_manager: Option>, + ) -> Self { if cache_size_bytes == 0 { - Self::with_disabled_cache(file_io) + Self::with_disabled_cache(file_io, encryption_manager) } else { Self { cache: moka::future::Cache::builder() @@ -69,17 +76,22 @@ impl ObjectCache { .build(), file_io, cache_disabled: false, + encryption_manager, } } } /// Creates a new [`ObjectCache`] /// with caching disabled - pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self { + pub(crate) fn with_disabled_cache( + file_io: FileIO, + encryption_manager: Option>, + ) -> Self { Self { cache: moka::future::Cache::new(0), file_io, cache_disabled: true, + encryption_manager, } } @@ -126,10 +138,15 @@ impl ObjectCache { table_metadata: &TableMetadataRef, ) -> Result> { if self.cache_disabled { - return snapshot - .load_manifest_list(&self.file_io, table_metadata) - .await - .map(Arc::new); + return ManifestListReader::new( + snapshot, + &self.file_io, + table_metadata, + self.encryption_manager.as_deref(), + ) + .load() + .await + .map(Arc::new); } let key = CachedObjectKey::ManifestList(( @@ -173,9 +190,14 @@ impl ObjectCache { snapshot: &SnapshotRef, table_metadata: &TableMetadataRef, ) -> Result { - let manifest_list = snapshot - .load_manifest_list(&self.file_io, table_metadata) - .await?; + let manifest_list = ManifestListReader::new( + snapshot, + &self.file_io, + table_metadata, + self.encryption_manager.as_deref(), + ) + .load() + .await?; Ok(CachedItem::ManifestList(Arc::new(manifest_list))) } @@ -319,7 +341,7 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone()); + let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone(), None); let result_manifest_list = object_cache .get_manifest_list( @@ -352,7 +374,7 @@ mod tests { let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; - let object_cache = ObjectCache::new(fixture.table.file_io().clone()); + let object_cache = ObjectCache::new(fixture.table.file_io().clone(), None); // not in cache let result_manifest_list = object_cache diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index baaab1f590..15dbda2b99 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -28,7 +28,8 @@ use serde_derive::{Deserialize, Serialize}; use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}; use self::_serde::{ManifestFileV1, ManifestFileV2}; -use super::{FormatVersion, Manifest}; +use super::{FormatVersion, Manifest, Snapshot, TableMetadata}; +use crate::encryption::{EncryptedInputFile, EncryptionManager}; use crate::error::Result; use crate::io::{FileIO, OutputFile}; use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3; @@ -90,6 +91,61 @@ impl ManifestList { } } +/// A manifest list reader that encapsulates the logic for loading and parsing a [`ManifestList`] +/// from a snapshot. +pub struct ManifestListReader<'a> { + snapshot: &'a Snapshot, + file_io: &'a FileIO, + table_metadata: &'a TableMetadata, + encryption_manager: Option<&'a EncryptionManager>, +} + +impl<'a> ManifestListReader<'a> { + pub(crate) fn new( + snapshot: &'a Snapshot, + file_io: &'a FileIO, + table_metadata: &'a TableMetadata, + encryption_manager: Option<&'a EncryptionManager>, + ) -> Self { + Self { + snapshot, + file_io, + table_metadata, + encryption_manager, + } + } + + /// Loads and returns the [`ManifestList`] for this snapshot. + pub async fn load(&self) -> Result { + let manifest_list_content = match ( + self.snapshot.encryption_key_id(), + self.encryption_manager, + ) { + (Some(_), None) => { + return Err(Error::new( + ErrorKind::PreconditionFailed, + "Snapshot has encryption_key_id but no EncryptionManager configured on Table", + )); + } + (Some(key_id), Some(em)) => { + let key_metadata = em.decrypt_manifest_list_key_metadata(key_id).await?; + let input = self.file_io.new_input(self.snapshot.manifest_list())?; + EncryptedInputFile::new(input, key_metadata).read().await? + } + (None, _) => { + self.file_io + .new_input(self.snapshot.manifest_list())? + .read() + .await? + } + }; + ManifestList::parse_with_version( + &manifest_list_content, + self.table_metadata.format_version(), + ) + } +} + /// A manifest list writer. pub struct ManifestListWriter { format_version: FormatVersion, diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 3b8a3c934e..87adf7c3c5 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -28,7 +28,7 @@ use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; -use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata}; +use crate::spec::{ManifestList, ManifestListReader, SchemaId, SchemaRef, TableMetadata}; use crate::{Error, ErrorKind}; /// The ref name of the main branch of the table. @@ -195,18 +195,15 @@ impl Snapshot { } /// Load manifest list. + #[deprecated(since = "0.9.0", note = "Use `Table::manifest_list_reader()` instead")] pub async fn load_manifest_list( &self, file_io: &FileIO, table_metadata: &TableMetadata, ) -> Result { - let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?; - ManifestList::parse_with_version( - &manifest_list_content, - // TODO: You don't really need the version since you could just project any Avro in - // the version that you'd like to get (probably always the latest) - table_metadata.format_version(), - ) + ManifestListReader::new(self, file_io, table_metadata, None) + .load() + .await } #[allow(dead_code)] @@ -521,12 +518,81 @@ impl SnapshotRetention { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; + use bytes::Bytes; use chrono::{TimeZone, Utc}; - use crate::spec::TableMetadata; + use crate::encryption::kms::{KeyManagementClient, MemoryKeyManagementClient}; + use crate::encryption::{EncryptionManager, StandardKeyMetadata}; + use crate::io::FileIO; + use crate::spec::manifest_list::{ManifestList, ManifestListWriter}; use crate::spec::snapshot::_serde::SnapshotV1; use crate::spec::snapshot::{Operation, Snapshot, Summary}; + use crate::spec::{ManifestListReader, TableMetadata}; + + const ENCRYPTION_TEST_V3_METADATA: &str = r#"{ + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "memory:///table", + "last-sequence-number": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + "refs": {}, + "next-row-id": 0 + }"#; + + fn encryption_test_metadata() -> TableMetadata { + serde_json::from_str(ENCRYPTION_TEST_V3_METADATA).unwrap() + } + + fn encryption_test_kms() -> Arc { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + Arc::new(kms) + } + + fn encryption_test_manager() -> EncryptionManager { + EncryptionManager::builder() + .kms_client(encryption_test_kms()) + .table_key_id("master-1") + .build() + } + + async fn write_v3_manifest_list_bytes(io: &FileIO, path: &str) -> Bytes { + let output = io.new_output(path).unwrap(); + let mut writer = ManifestListWriter::v3(output, 1, None, 0, Some(0)); + writer.add_manifests(std::iter::empty()).unwrap(); + writer.close().await.unwrap(); + io.new_input(path).unwrap().read().await.unwrap() + } + + fn snapshot_pointing_at(manifest_list_path: &str, key_id: Option) -> Snapshot { + Snapshot::builder() + .with_snapshot_id(1) + .with_sequence_number(0) + .with_timestamp_ms(0) + .with_manifest_list(manifest_list_path.to_string()) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_encryption_key_id(key_id) + .build() + } #[test] fn schema() { @@ -729,4 +795,53 @@ mod tests { assert_eq!(v2_snapshot.parent_snapshot_id(), None); assert_eq!(v2_snapshot.schema_id(), None); } + + #[tokio::test] + async fn load_manifest_list_errors_when_encrypted_but_no_manager_configured() { + let io = FileIO::new_with_memory(); + let snapshot = snapshot_pointing_at( + "memory:///table/metadata/manifest-list-enc.avro", + Some("k1".to_string()), + ); + let metadata = encryption_test_metadata(); + + let err = ManifestListReader::new(&snapshot, &io, &metadata, None) + .load() + .await + .unwrap_err(); + assert_eq!(err.kind(), crate::ErrorKind::PreconditionFailed); + } + + #[tokio::test] + async fn load_manifest_list_decrypts_roundtrip() { + let io = FileIO::new_with_memory(); + let plain_path = "memory:///table/metadata/manifest-list-plain.avro"; + let encrypted_path = "memory:///table/metadata/manifest-list-enc.avro"; + + // Build raw manifest list bytes via the standard writer. + let raw_bytes = write_v3_manifest_list_bytes(&io, plain_path).await; + + // Encrypt those bytes to a second path and capture the file's key metadata. + let mgr = encryption_test_manager(); + let encrypted_output = mgr.encrypt(io.new_output(encrypted_path).unwrap()); + let std_key_metadata: StandardKeyMetadata = encrypted_output.key_metadata().clone(); + encrypted_output.write(raw_bytes).await.unwrap(); + + // Wrap the file's key metadata with a KEK and record the resulting wrapped + // entry's id on the snapshot. + let key_id = mgr + .encrypt_manifest_list_key_metadata(&std_key_metadata) + .await + .unwrap(); + + let snapshot = snapshot_pointing_at(encrypted_path, Some(key_id)); + let metadata = encryption_test_metadata(); + + let manifest_list: ManifestList = + ManifestListReader::new(&snapshot, &io, &metadata, Some(&mgr)) + .load() + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 0); + } } diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index dc21da565c..883f05e5a5 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -128,6 +128,11 @@ pub struct TableProperties { pub cdc_max_chunk_size: usize, /// Content-defined chunking normalization level (gearhash bit adjustment). pub cdc_norm_level: i32, + /// The master key id used to encrypt this table's manifest list and data + /// files. `None` if `encryption.key-id` is not set. + pub encryption_key_id: Option, + /// The encryption data encryption key length in bytes. + pub encryption_data_key_length: usize, } impl TableProperties { @@ -253,6 +258,15 @@ impl TableProperties { "write.parquet.content-defined-chunking.norm-level"; /// Default matches `parquet::file::properties::DEFAULT_CDC_NORM_LEVEL`. pub const PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT: i32 = 0; + + /// Property key for the master key id used to encrypt the table's manifest + /// list and data files as defined in https://iceberg.apache.org/docs/nightly/encryption/. + pub const PROPERTY_ENCRYPTION_KEY_ID: &str = "encryption.key-id"; + + /// Property key for the encryption data encryption key (DEK) length in bytes. + pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH: &str = "encryption.data-key-length"; + /// Default value for the encryption DEK length (16 bytes = AES-128). + pub const PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT: usize = 16; } impl TryFrom<&HashMap> for TableProperties { @@ -322,6 +336,14 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL, TableProperties::PROPERTY_PARQUET_CDC_NORM_LEVEL_DEFAULT, )?, + encryption_key_id: props + .get(TableProperties::PROPERTY_ENCRYPTION_KEY_ID) + .cloned(), + encryption_data_key_length: parse_property( + props, + TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH, + TableProperties::PROPERTY_ENCRYPTION_DATA_KEY_LENGTH_DEFAULT, + )?, }) } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d2ba93f854..efd7df3234 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -20,12 +20,16 @@ use std::sync::Arc; use crate::arrow::ArrowReaderBuilder; +use crate::encryption::kms::KeyManagementClient; +use crate::encryption::{AesKeySize, EncryptionManager}; use crate::inspect::MetadataTable; use crate::io::FileIO; use crate::io::object_cache::ObjectCache; use crate::runtime::Runtime; use crate::scan::TableScanBuilder; -use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; +use crate::spec::{ + FormatVersion, ManifestListReader, SchemaRef, Snapshot, TableMetadata, TableMetadataRef, +}; use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. @@ -34,6 +38,7 @@ pub struct TableBuilder { metadata_location: Option, metadata: Option, identifier: Option, + kms_client: Option>, readonly: bool, disable_cache: bool, cache_size_bytes: Option, @@ -47,6 +52,7 @@ impl TableBuilder { metadata_location: None, metadata: None, identifier: None, + kms_client: None, readonly: false, disable_cache: false, cache_size_bytes: None, @@ -104,6 +110,16 @@ impl TableBuilder { self } + /// optional - sets the KMS client used to unwrap keys for table encryption. + /// + /// If the table metadata has the `encryption.key-id` property set, a + /// [`KeyManagementClient`] must be provided here so the table can build + /// an [`EncryptionManager`]; otherwise [`Self::build`] will return an error. + pub fn kms_client(mut self, kms_client: Arc) -> Self { + self.kms_client = Some(kms_client); + self + } + /// build the Table pub fn build(self) -> Result { let Self { @@ -111,6 +127,7 @@ impl TableBuilder { metadata_location, metadata, identifier, + kms_client, readonly, disable_cache, cache_size_bytes, @@ -145,15 +162,24 @@ impl TableBuilder { )); }; + let encryption_manager = maybe_configure_encryption(kms_client.as_ref(), &metadata)?; + let object_cache = if disable_cache { - Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) + Arc::new(ObjectCache::with_disabled_cache( + file_io.clone(), + encryption_manager.clone(), + )) } else if let Some(cache_size_bytes) = cache_size_bytes { Arc::new(ObjectCache::new_with_capacity( file_io.clone(), cache_size_bytes, + encryption_manager.clone(), )) } else { - Arc::new(ObjectCache::new(file_io.clone())) + Arc::new(ObjectCache::new( + file_io.clone(), + encryption_manager.clone(), + )) }; Ok(Table { @@ -164,6 +190,7 @@ impl TableBuilder { readonly, object_cache, runtime, + encryption_manager, }) } } @@ -178,6 +205,7 @@ pub struct Table { readonly: bool, object_cache: Arc, runtime: Runtime, + encryption_manager: Option>, } impl Table { @@ -238,6 +266,16 @@ impl Table { self.object_cache.clone() } + /// Returns the [`EncryptionManager`] for this table, if encryption is + /// configured. + /// + /// A manager is present iff the table metadata has the + /// `encryption.key-id` property set and a [`KeyManagementClient`] was + /// supplied to the [`TableBuilder`]. + pub fn encryption_manager(&self) -> Option<&EncryptionManager> { + self.encryption_manager.as_deref() + } + /// Creates a table scan. pub fn scan(&self) -> TableScanBuilder<'_> { TableScanBuilder::new(self) @@ -264,6 +302,16 @@ impl Table { self.metadata.current_schema().clone() } + /// Creates a [`ManifestListReader`] for the given snapshot. + pub fn manifest_list_reader<'a>(&'a self, snapshot: &'a Snapshot) -> ManifestListReader<'a> { + ManifestListReader::new( + snapshot, + &self.file_io, + &self.metadata, + self.encryption_manager.as_deref(), + ) + } + /// Create a reader for the table. pub fn reader_builder(&self) -> ArrowReaderBuilder { ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone()) @@ -356,9 +404,56 @@ impl StaticTable { } } +/// If the table metadata sets the `encryption.key-id` property, build an +/// [`EncryptionManager`] for the table. +/// +/// Returns `Ok(None)` if the format version is below v3 or the property is +/// not set. Returns an error if the property is set but no +/// [`KeyManagementClient`] was provided. +fn maybe_configure_encryption( + kms_client: Option<&Arc>, + metadata: &TableMetadataRef, +) -> Result>> { + if metadata.format_version() < FormatVersion::V3 { + return Ok(None); + } + + let table_properties = metadata.table_properties()?; + let Some(table_key_id) = table_properties.encryption_key_id else { + if kms_client.is_some() { + tracing::warn!( + "KeyManagementClient provided but table does not have encryption.key-id set" + ); + } + return Ok(None); + }; + + let kms_client = kms_client.ok_or_else(|| { + Error::new( + ErrorKind::PreconditionFailed, + "Table has encryption.key-id set but no KeyManagementClient was provided to TableBuilder", + ) + })?; + + let em = EncryptionManager::builder() + .kms_client(Arc::clone(kms_client)) + .table_key_id(table_key_id) + .encryption_keys(metadata.encryption_keys.clone()) + .key_size(AesKeySize::from_key_length( + table_properties.encryption_data_key_length, + )?) + .build(); + Ok(Some(Arc::new(em))) +} + #[cfg(test)] mod tests { + use std::collections::HashMap; + use super::*; + use crate::encryption::StandardKeyMetadata; + use crate::encryption::kms::MemoryKeyManagementClient; + use crate::spec::{ManifestListWriter, Operation, Snapshot, Summary, TableProperties}; #[tokio::test] async fn test_static_table_from_file() { @@ -432,4 +527,188 @@ mod tests { assert!(!table.readonly()); assert_eq!(table.identifier.name(), "table"); } + + const V3_METADATA: &str = r#"{ + "format-version": 3, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "memory:///table", + "last-sequence-number": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + "refs": {}, + "next-row-id": 0 + }"#; + + const V2_METADATA: &str = r#"{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "memory:///table", + "last-sequence-number": 0, + "last-updated-ms": 1602638573590, + "last-column-id": 1, + "current-schema-id": 0, + "schemas": [{"type": "struct", "schema-id": 0, "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"} + ]}], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 1000, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {}, + "snapshots": [], + "snapshot-log": [], + "metadata-log": [], + "refs": {} + }"#; + + fn make_kms() -> Arc { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + Arc::new(kms) + } + + async fn write_empty_manifest_list_bytes(io: &FileIO, path: &str) -> bytes::Bytes { + let output = io.new_output(path).unwrap(); + let mut writer = ManifestListWriter::v3(output, 1, None, 0, Some(0)); + writer.add_manifests(std::iter::empty()).unwrap(); + writer.close().await.unwrap(); + io.new_input(path).unwrap().read().await.unwrap() + } + + #[tokio::test] + async fn table_decrypts_manifest_list_via_object_cache() { + let io = FileIO::new_with_memory(); + let plain_path = "memory:///table/metadata/manifest-list-plain.avro"; + let encrypted_path = "memory:///table/metadata/manifest-list-enc.avro"; + + // Encrypt a real manifest list onto the encrypted path. + let raw = write_empty_manifest_list_bytes(&io, plain_path).await; + let kms = make_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + let encrypted_output = mgr.encrypt(io.new_output(encrypted_path).unwrap()); + let std_km: StandardKeyMetadata = encrypted_output.key_metadata().clone(); + encrypted_output.write(raw).await.unwrap(); + let key_id = mgr + .encrypt_manifest_list_key_metadata(&std_km) + .await + .unwrap(); + + // Snapshot the wrapped keys (manifest-list entry + KEK) the manager produced. + let encryption_keys = mgr.with_encryption_keys(|keys| keys.clone()); + + // Build a TableMetadata with those keys, the encryption.key-id property, + // and a snapshot whose encryption_key_id points at the wrapped entry. + let mut metadata: TableMetadata = serde_json::from_str(V3_METADATA).unwrap(); + metadata.properties.insert( + TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(), + "master-1".to_string(), + ); + metadata.encryption_keys = encryption_keys; + + let snapshot = Snapshot::builder() + .with_snapshot_id(1) + .with_sequence_number(0) + .with_timestamp_ms(0) + .with_manifest_list(encrypted_path.to_string()) + .with_summary(Summary { + operation: Operation::Append, + additional_properties: HashMap::new(), + }) + .with_schema_id(0) + .with_encryption_key_id(Some(key_id)) + .build(); + let snapshot_ref = Arc::new(snapshot); + metadata + .snapshots + .insert(snapshot_ref.snapshot_id(), snapshot_ref.clone()); + metadata.current_snapshot_id = Some(snapshot_ref.snapshot_id()); + + // Build the table with the KMS client, then read via the object cache. + let table = Table::builder() + .file_io(io) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap()) + .kms_client(kms) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap(); + assert!(table.encryption_manager().is_some()); + + let manifest_list = table + .object_cache() + .get_manifest_list(&snapshot_ref, &table.metadata_ref()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 0); + } + + #[tokio::test] + async fn table_builder_errors_when_encryption_key_id_set_but_no_kms() { + let mut metadata: TableMetadata = serde_json::from_str(V3_METADATA).unwrap(); + metadata.properties.insert( + TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(), + "master-1".to_string(), + ); + + let err = Table::builder() + .file_io(FileIO::new_with_memory()) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap()) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap_err(); + assert_eq!(err.kind(), ErrorKind::PreconditionFailed); + } + + #[tokio::test] + async fn table_builder_skips_encryption_on_pre_v3_table() { + // Encryption is a v3 spec feature; pre-v3 tables silently skip + // encryption even if encryption.key-id is set. + let mut metadata: TableMetadata = serde_json::from_str(V2_METADATA).unwrap(); + metadata.properties.insert( + TableProperties::PROPERTY_ENCRYPTION_KEY_ID.to_string(), + "master-1".to_string(), + ); + + let table = Table::builder() + .file_io(FileIO::new_with_memory()) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "enc"]).unwrap()) + .kms_client(make_kms()) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap(); + assert!(table.encryption_manager().is_none()); + } + + #[tokio::test] + async fn table_builder_skips_encryption_when_property_absent() { + let metadata: TableMetadata = serde_json::from_str(V2_METADATA).unwrap(); + let table = Table::builder() + .file_io(FileIO::new_with_memory()) + .metadata(metadata) + .identifier(TableIdent::from_strs(["ns", "plain"]).unwrap()) + .kms_client(make_kms()) + .runtime(Runtime::try_current().unwrap()) + .build() + .unwrap(); + assert!(table.encryption_manager().is_none()); + } } diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 08d4032409..ec4ceee277 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -128,11 +128,10 @@ impl SnapshotProduceOperation for FastAppendOperation { return Ok(vec![]); }; - let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.table.file_io(), - &snapshot_produce.table.metadata_ref(), - ) + let manifest_list = snapshot_produce + .table + .manifest_list_reader(snapshot) + .load() .await?; Ok(manifest_list @@ -304,8 +303,9 @@ mod tests { } else { unreachable!() }; - let manifest_list = new_snapshot - .load_manifest_list(table.file_io(), table.metadata()) + let manifest_list = table + .manifest_list_reader(new_snapshot) + .load() .await .unwrap(); assert_eq!(1, manifest_list.entries().len()); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 04ee1997d0..b68a53e5e3 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -605,13 +605,8 @@ mod test_row_lineage { assert_eq!(table.metadata().next_row_id(), 30); // Check written manifest for first_row_id - let manifest_list = table - .metadata() - .current_snapshot() - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap(); assert_eq!(manifest_list.entries().len(), 1); let manifest_file = &manifest_list.entries()[0]; @@ -633,13 +628,7 @@ mod test_row_lineage { assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11); // Check written manifest for first_row_id - let manifest_list = table - .metadata() - .current_snapshot() - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); + let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap(); assert_eq!(manifest_list.entries().len(), 2); let manifest_file = &manifest_list.entries()[1]; assert_eq!(manifest_file.first_row_id, Some(30)); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..497467b37c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -172,8 +172,10 @@ impl<'a> SnapshotProducer<'a> { let mut referenced_files = Vec::new(); if let Some(current_snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + let manifest_list = self + .table + .manifest_list_reader(current_snapshot) + .load() .await?; for manifest_list_entry in manifest_list.entries() { let manifest = manifest_list_entry diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 3b3ff3d6b3..835c804908 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -504,8 +504,9 @@ mod tests { let current_snapshot = updated_table.metadata().current_snapshot().unwrap(); // Load the manifest list to verify the data files were added - let manifest_list = current_snapshot - .load_manifest_list(updated_table.file_io(), updated_table.metadata()) + let manifest_list = updated_table + .manifest_list_reader(current_snapshot) + .load() .await?; // There should be at least one manifest