diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index c51f6a6a89..86a3894255 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -677,12 +677,7 @@ impl Catalog for GlueCatalog { async fn purge_table(&self, table: &TableIdent) -> Result<()> { let table_info = self.load_table(table).await?; self.drop_table(table).await?; - iceberg::drop_table_data( - table_info.file_io(), - table_info.metadata(), - table_info.metadata_location(), - ) - .await + iceberg::drop_table_data(&table_info).await } /// Asynchronously checks the existence of a specified table diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index d778a3d5fc..0f15890c77 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -624,12 +624,7 @@ impl Catalog for HmsCatalog { async fn purge_table(&self, table: &TableIdent) -> Result<()> { let table_info = self.load_table(table).await?; self.drop_table(table).await?; - iceberg::drop_table_data( - table_info.file_io(), - table_info.metadata(), - table_info.metadata_location(), - ) - .await + iceberg::drop_table_data(&table_info).await } /// Asynchronously checks the existence of a specified table diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..81d6cde591 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -774,12 +774,7 @@ impl Catalog for SqlCatalog { async fn purge_table(&self, table: &TableIdent) -> Result<()> { let table_info = self.load_table(table).await?; self.drop_table(table).await?; - iceberg::drop_table_data( - table_info.file_io(), - table_info.metadata(), - table_info.metadata_location(), - ) - .await + iceberg::drop_table_data(&table_info).await } async fn load_table(&self, identifier: &TableIdent) -> Result { diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index a88c91d625..0008ab8c74 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -2025,6 +2025,9 @@ pub fn iceberg::spec::ManifestList::eq(&self, other: &iceberg::spec::ManifestLis impl core::fmt::Debug for iceberg::spec::ManifestList pub fn iceberg::spec::ManifestList::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl core::marker::StructuralPartialEq for iceberg::spec::ManifestList +pub struct iceberg::spec::ManifestListReader +impl iceberg::spec::ManifestListReader +pub async fn iceberg::spec::ManifestListReader::load(&self) -> iceberg::Result pub struct iceberg::spec::ManifestListWriter impl iceberg::spec::ManifestListWriter pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: impl core::iter::traits::iterator::Iterator) -> iceberg::Result<()> @@ -2340,7 +2343,6 @@ impl iceberg::spec::Snapshot pub fn iceberg::spec::Snapshot::added_rows_count(&self) -> core::option::Option pub fn iceberg::spec::Snapshot::encryption_key_id(&self) -> core::option::Option<&str> pub fn iceberg::spec::Snapshot::first_row_id(&self) -> core::option::Option -pub async fn iceberg::spec::Snapshot::load_manifest_list(&self, file_io: &iceberg::io::FileIO, table_metadata: &iceberg::spec::TableMetadata) -> iceberg::Result pub fn iceberg::spec::Snapshot::manifest_list(&self) -> &str pub fn iceberg::spec::Snapshot::parent_snapshot_id(&self) -> core::option::Option pub fn iceberg::spec::Snapshot::row_range(&self) -> core::option::Option<(u64, u64)> @@ -3014,6 +3016,7 @@ pub fn iceberg::table::Table::current_schema_ref(&self) -> iceberg::spec::Schema pub fn iceberg::table::Table::file_io(&self) -> &iceberg::io::FileIO pub fn iceberg::table::Table::identifier(&self) -> &iceberg::TableIdent pub fn iceberg::table::Table::inspect(&self) -> iceberg::inspect::MetadataTable<'_> +pub fn iceberg::table::Table::manifest_list_reader(&self, snapshot: &iceberg::spec::SnapshotRef) -> iceberg::spec::ManifestListReader pub fn iceberg::table::Table::metadata(&self) -> &iceberg::spec::TableMetadata pub fn iceberg::table::Table::metadata_location(&self) -> core::option::Option<&str> pub fn iceberg::table::Table::metadata_location_result(&self) -> iceberg::Result<&str> @@ -3665,5 +3668,5 @@ pub type iceberg::memory::MemoryCatalogBuilder::C = iceberg::memory::MemoryCatal pub fn iceberg::memory::MemoryCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send pub fn iceberg::memory::MemoryCatalogBuilder::with_runtime(self, runtime: iceberg::Runtime) -> Self pub fn iceberg::memory::MemoryCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self -pub async fn iceberg::drop_table_data(io: &iceberg::io::FileIO, metadata: &iceberg::spec::TableMetadata, metadata_location: core::option::Option<&str>) -> iceberg::Result<()> +pub async fn iceberg::drop_table_data(table_info: &iceberg::table::Table) -> iceberg::Result<()> pub type iceberg::Result = core::result::Result diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 3ae01a23df..67f8ab8dd1 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -343,12 +343,7 @@ impl Catalog for MemoryCatalog { async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> { let table_info = self.load_table(table_ident).await?; self.drop_table(table_ident).await?; - crate::catalog::utils::drop_table_data( - table_info.file_io(), - table_info.metadata(), - table_info.metadata_location(), - ) - .await + crate::catalog::utils::drop_table_data(&table_info).await } /// Check if a table exists in the catalog. diff --git a/crates/iceberg/src/catalog/utils.rs b/crates/iceberg/src/catalog/utils.rs index d450f9df80..853fc8d09a 100644 --- a/crates/iceberg/src/catalog/utils.rs +++ b/crates/iceberg/src/catalog/utils.rs @@ -23,7 +23,7 @@ use futures::{TryStreamExt, stream}; use crate::Result; use crate::io::FileIO; -use crate::spec::TableMetadata; +use crate::table::Table; const DELETE_CONCURRENCY: usize = 10; @@ -36,18 +36,16 @@ const DELETE_CONCURRENCY: usize = 10; /// Data files within manifests are only deleted if the `gc.enabled` table /// property is `true` (the default), to avoid corrupting other tables that /// may share the same data files. -pub async fn drop_table_data( - io: &FileIO, - metadata: &TableMetadata, - metadata_location: Option<&str>, -) -> Result<()> { +pub async fn drop_table_data(table_info: &Table) -> Result<()> { let mut manifest_lists_to_delete: HashSet = HashSet::new(); let mut manifests_to_delete: HashSet = HashSet::new(); + let metadata = table_info.metadata_ref(); + let io = table_info.file_io(); // Load all manifest lists concurrently let results: Vec<_> = futures::future::try_join_all(metadata.snapshots().map(|snapshot| async { - let manifest_list = snapshot.load_manifest_list(io, metadata).await?; + let manifest_list = table_info.manifest_list_reader(snapshot).load().await?; Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list)) })) .await?; @@ -97,7 +95,7 @@ pub async fn drop_table_data( .await?; // Delete the current metadata file - if let Some(location) = metadata_location { + if let Some(location) = table_info.metadata_location() { io.delete(location).await?; } diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 4c30ca2ec5..a985786460 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -161,9 +161,7 @@ impl<'a> ManifestsTable<'a> { let mut partition_summaries = self.partition_summary_builder()?; if let Some(snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) - .await?; + let manifest_list = self.table.manifest_list_reader(snapshot).load().await?; for manifest in manifest_list.entries() { content.append_value(manifest.content as i32); path.append_value(manifest.manifest_path.clone()); diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 5de45e2acc..d26adf86f6 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -20,7 +20,8 @@ use std::sync::Arc; use crate::io::FileIO; use crate::spec::{ - FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef, + FormatVersion, Manifest, ManifestFile, ManifestList, ManifestListReader, SchemaId, SnapshotRef, + TableMetadataRef, }; use crate::{Error, ErrorKind, Result}; @@ -126,10 +127,14 @@ impl ObjectCache { table_metadata: &TableMetadataRef, ) -> Result> { if self.cache_disabled { - return snapshot - .load_manifest_list(&self.file_io, table_metadata) - .await - .map(Arc::new); + return ManifestListReader::new( + snapshot.clone(), + self.file_io.clone(), + table_metadata.clone(), + ) + .load() + .await + .map(Arc::new); } let key = CachedObjectKey::ManifestList(( @@ -173,9 +178,13 @@ impl ObjectCache { snapshot: &SnapshotRef, table_metadata: &TableMetadataRef, ) -> Result { - let manifest_list = snapshot - .load_manifest_list(&self.file_io, table_metadata) - .await?; + let manifest_list = ManifestListReader::new( + snapshot.clone(), + self.file_io.clone(), + table_metadata.clone(), + ) + .load() + .await?; Ok(CachedItem::ManifestList(Arc::new(manifest_list))) } diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index baaab1f590..5f5fa3c148 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -28,7 +28,7 @@ use serde_derive::{Deserialize, Serialize}; use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}; use self::_serde::{ManifestFileV1, ManifestFileV2}; -use super::{FormatVersion, Manifest}; +use super::{FormatVersion, Manifest, SnapshotRef, TableMetadataRef}; use crate::error::Result; use crate::io::{FileIO, OutputFile}; use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3; @@ -90,6 +90,41 @@ impl ManifestList { } } +/// A manifest list reader that encapsulates the logic for loading and parsing a [`ManifestList`] +/// from a snapshot. +pub struct ManifestListReader { + snapshot: SnapshotRef, + file_io: FileIO, + table_metadata: TableMetadataRef, +} + +impl ManifestListReader { + pub(crate) fn new( + snapshot: SnapshotRef, + file_io: FileIO, + table_metadata: TableMetadataRef, + ) -> Self { + Self { + snapshot, + file_io, + table_metadata, + } + } + + /// Loads and returns the [`ManifestList`] for this snapshot. + pub async fn load(&self) -> Result { + let manifest_list_content = self + .file_io + .new_input(self.snapshot.manifest_list())? + .read() + .await?; + ManifestList::parse_with_version( + &manifest_list_content, + self.table_metadata.format_version(), + ) + } +} + /// A manifest list writer. pub struct ManifestListWriter { format_version: FormatVersion, diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 3b8a3c934e..7db8f5890f 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -27,8 +27,7 @@ use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; use crate::error::{Result, timestamp_ms_to_utc}; -use crate::io::FileIO; -use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata}; +use crate::spec::{SchemaId, SchemaRef, TableMetadata}; use crate::{Error, ErrorKind}; /// The ref name of the main branch of the table. @@ -194,21 +193,6 @@ impl Snapshot { } } - /// Load manifest list. - pub async fn load_manifest_list( - &self, - file_io: &FileIO, - table_metadata: &TableMetadata, - ) -> Result { - let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?; - ManifestList::parse_with_version( - &manifest_list_content, - // TODO: You don't really need the version since you could just project any Avro in - // the version that you'd like to get (probably always the latest) - table_metadata.format_version(), - ) - } - #[allow(dead_code)] pub(crate) fn log(&self) -> SnapshotLog { SnapshotLog { diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index d2ba93f854..f4880509b3 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -25,7 +25,7 @@ use crate::io::FileIO; use crate::io::object_cache::ObjectCache; use crate::runtime::Runtime; use crate::scan::TableScanBuilder; -use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef}; +use crate::spec::{ManifestListReader, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef}; use crate::{Error, ErrorKind, Result, TableIdent}; /// Builder to create table scan. @@ -264,6 +264,15 @@ impl Table { self.metadata.current_schema().clone() } + /// Creates a [`ManifestListReader`] for the given snapshot. + pub fn manifest_list_reader(&self, snapshot: &SnapshotRef) -> ManifestListReader { + ManifestListReader::new( + snapshot.clone(), + self.file_io.clone(), + self.metadata.clone(), + ) + } + /// Create a reader for the table. pub fn reader_builder(&self) -> ArrowReaderBuilder { ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone()) diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index 299973742e..2c2df1f644 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -128,11 +128,10 @@ impl SnapshotProduceOperation for FastAppendOperation { return Ok(vec![]); }; - let manifest_list = snapshot - .load_manifest_list( - snapshot_produce.table.file_io(), - &snapshot_produce.table.metadata_ref(), - ) + let manifest_list = snapshot_produce + .table + .manifest_list_reader(snapshot) + .load() .await?; Ok(manifest_list @@ -162,7 +161,8 @@ mod tests { use crate::io::FileIO; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestEntry, - ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata, + ManifestListWriter, ManifestStatus, ManifestWriterBuilder, SnapshotRef, Struct, + TableMetadata, }; use crate::table::Table; use crate::test_utils::test_runtime; @@ -338,14 +338,15 @@ mod tests { let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); let updates = action_commit.take_updates(); - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { - snapshot + let new_snapshot: SnapshotRef = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + SnapshotRef::new(snapshot.clone()) } else { unreachable!("first update of a fast append should be AddSnapshot") }; - let manifest_list = new_snapshot - .load_manifest_list(table.file_io(), table.metadata()) + let manifest_list = table + .manifest_list_reader(&new_snapshot) + .load() .await .unwrap(); @@ -502,13 +503,14 @@ mod tests { ); // check manifest list - let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { - snapshot + let new_snapshot: SnapshotRef = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + SnapshotRef::new(snapshot.clone()) } else { unreachable!() }; - let manifest_list = new_snapshot - .load_manifest_list(table.file_io(), table.metadata()) + let manifest_list = table + .manifest_list_reader(&new_snapshot) + .load() .await .unwrap(); assert_eq!(1, manifest_list.entries().len()); diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 04ee1997d0..b68a53e5e3 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -605,13 +605,8 @@ mod test_row_lineage { assert_eq!(table.metadata().next_row_id(), 30); // Check written manifest for first_row_id - let manifest_list = table - .metadata() - .current_snapshot() - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap(); assert_eq!(manifest_list.entries().len(), 1); let manifest_file = &manifest_list.entries()[0]; @@ -633,13 +628,7 @@ mod test_row_lineage { assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11); // Check written manifest for first_row_id - let manifest_list = table - .metadata() - .current_snapshot() - .unwrap() - .load_manifest_list(table.file_io(), table.metadata()) - .await - .unwrap(); + let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap(); assert_eq!(manifest_list.entries().len(), 2); let manifest_file = &manifest_list.entries()[1]; assert_eq!(manifest_file.first_row_id, Some(30)); diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..497467b37c 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -172,8 +172,10 @@ impl<'a> SnapshotProducer<'a> { let mut referenced_files = Vec::new(); if let Some(current_snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) + let manifest_list = self + .table + .manifest_list_reader(current_snapshot) + .load() .await?; for manifest_list_entry in manifest_list.entries() { let manifest = manifest_list_entry diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 3b3ff3d6b3..835c804908 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -504,8 +504,9 @@ mod tests { let current_snapshot = updated_table.metadata().current_snapshot().unwrap(); // Load the manifest list to verify the data files were added - let manifest_list = current_snapshot - .load_manifest_list(updated_table.file_io(), updated_table.metadata()) + let manifest_list = updated_table + .manifest_list_reader(current_snapshot) + .load() .await?; // There should be at least one manifest