Skip to content
7 changes: 1 addition & 6 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,12 +677,7 @@ impl Catalog for GlueCatalog {
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let table_info = self.load_table(table).await?;
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
iceberg::drop_table_data(&table_info).await
}

/// Asynchronously checks the existence of a specified table
Expand Down
7 changes: 1 addition & 6 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,12 +624,7 @@ impl Catalog for HmsCatalog {
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let table_info = self.load_table(table).await?;
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
iceberg::drop_table_data(&table_info).await
}

/// Asynchronously checks the existence of a specified table
Expand Down
7 changes: 1 addition & 6 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -774,12 +774,7 @@ impl Catalog for SqlCatalog {
async fn purge_table(&self, table: &TableIdent) -> Result<()> {
let table_info = self.load_table(table).await?;
self.drop_table(table).await?;
iceberg::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
iceberg::drop_table_data(&table_info).await
}

async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
Expand Down
7 changes: 5 additions & 2 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,9 @@ pub fn iceberg::spec::ManifestList::eq(&self, other: &iceberg::spec::ManifestLis
impl core::fmt::Debug for iceberg::spec::ManifestList
pub fn iceberg::spec::ManifestList::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::marker::StructuralPartialEq for iceberg::spec::ManifestList
pub struct iceberg::spec::ManifestListReader
impl iceberg::spec::ManifestListReader
pub async fn iceberg::spec::ManifestListReader::load(&self) -> iceberg::Result<iceberg::spec::ManifestList>
pub struct iceberg::spec::ManifestListWriter
impl iceberg::spec::ManifestListWriter
pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: impl core::iter::traits::iterator::Iterator<Item = iceberg::spec::ManifestFile>) -> iceberg::Result<()>
Expand Down Expand Up @@ -2340,7 +2343,6 @@ impl iceberg::spec::Snapshot
pub fn iceberg::spec::Snapshot::added_rows_count(&self) -> core::option::Option<u64>
pub fn iceberg::spec::Snapshot::encryption_key_id(&self) -> core::option::Option<&str>
pub fn iceberg::spec::Snapshot::first_row_id(&self) -> core::option::Option<u64>
pub async fn iceberg::spec::Snapshot::load_manifest_list(&self, file_io: &iceberg::io::FileIO, table_metadata: &iceberg::spec::TableMetadata) -> iceberg::Result<iceberg::spec::ManifestList>
pub fn iceberg::spec::Snapshot::manifest_list(&self) -> &str
pub fn iceberg::spec::Snapshot::parent_snapshot_id(&self) -> core::option::Option<i64>
pub fn iceberg::spec::Snapshot::row_range(&self) -> core::option::Option<(u64, u64)>
Expand Down Expand Up @@ -3014,6 +3016,7 @@ pub fn iceberg::table::Table::current_schema_ref(&self) -> iceberg::spec::Schema
pub fn iceberg::table::Table::file_io(&self) -> &iceberg::io::FileIO
pub fn iceberg::table::Table::identifier(&self) -> &iceberg::TableIdent
pub fn iceberg::table::Table::inspect(&self) -> iceberg::inspect::MetadataTable<'_>
pub fn iceberg::table::Table::manifest_list_reader(&self, snapshot: &iceberg::spec::SnapshotRef) -> iceberg::spec::ManifestListReader
pub fn iceberg::table::Table::metadata(&self) -> &iceberg::spec::TableMetadata
pub fn iceberg::table::Table::metadata_location(&self) -> core::option::Option<&str>
pub fn iceberg::table::Table::metadata_location_result(&self) -> iceberg::Result<&str>
Expand Down Expand Up @@ -3665,5 +3668,5 @@ pub type iceberg::memory::MemoryCatalogBuilder::C = iceberg::memory::MemoryCatal
pub fn iceberg::memory::MemoryCatalogBuilder::load(self, name: impl core::convert::Into<alloc::string::String>, props: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>) -> impl core::future::future::Future<Output = iceberg::Result<Self::C>> + core::marker::Send
pub fn iceberg::memory::MemoryCatalogBuilder::with_runtime(self, runtime: iceberg::Runtime) -> Self
pub fn iceberg::memory::MemoryCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc<dyn iceberg::io::StorageFactory>) -> Self
pub async fn iceberg::drop_table_data(io: &iceberg::io::FileIO, metadata: &iceberg::spec::TableMetadata, metadata_location: core::option::Option<&str>) -> iceberg::Result<()>
pub async fn iceberg::drop_table_data(table_info: &iceberg::table::Table) -> iceberg::Result<()>
pub type iceberg::Result<T> = core::result::Result<T, iceberg::Error>
7 changes: 1 addition & 6 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,7 @@ impl Catalog for MemoryCatalog {
async fn purge_table(&self, table_ident: &TableIdent) -> Result<()> {
let table_info = self.load_table(table_ident).await?;
self.drop_table(table_ident).await?;
crate::catalog::utils::drop_table_data(
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
)
.await
crate::catalog::utils::drop_table_data(&table_info).await
}

/// Check if a table exists in the catalog.
Expand Down
14 changes: 6 additions & 8 deletions crates/iceberg/src/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::{TryStreamExt, stream};

use crate::Result;
use crate::io::FileIO;
use crate::spec::TableMetadata;
use crate::table::Table;

const DELETE_CONCURRENCY: usize = 10;

Expand All @@ -36,18 +36,16 @@ const DELETE_CONCURRENCY: usize = 10;
/// Data files within manifests are only deleted if the `gc.enabled` table
/// property is `true` (the default), to avoid corrupting other tables that
/// may share the same data files.
pub async fn drop_table_data(
io: &FileIO,
metadata: &TableMetadata,
metadata_location: Option<&str>,
) -> Result<()> {
pub async fn drop_table_data(table_info: &Table) -> Result<()> {
let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
let mut manifests_to_delete: HashSet<String> = HashSet::new();

let metadata = table_info.metadata_ref();
let io = table_info.file_io();
// Load all manifest lists concurrently
let results: Vec<_> =
futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
let manifest_list = snapshot.load_manifest_list(io, metadata).await?;
let manifest_list = table_info.manifest_list_reader(snapshot).load().await?;
Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
}))
.await?;
Expand Down Expand Up @@ -97,7 +95,7 @@ pub async fn drop_table_data(
.await?;

// Delete the current metadata file
if let Some(location) = metadata_location {
if let Some(location) = table_info.metadata_location() {
io.delete(location).await?;
}

Expand Down
4 changes: 1 addition & 3 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ impl<'a> ManifestsTable<'a> {
let mut partition_summaries = self.partition_summary_builder()?;

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
let manifest_list = self.table.manifest_list_reader(snapshot).load().await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
Expand Down
25 changes: 17 additions & 8 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ use std::sync::Arc;

use crate::io::FileIO;
use crate::spec::{
FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
FormatVersion, Manifest, ManifestFile, ManifestList, ManifestListReader, SchemaId, SnapshotRef,
TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};

Expand Down Expand Up @@ -126,10 +127,14 @@ impl ObjectCache {
table_metadata: &TableMetadataRef,
) -> Result<Arc<ManifestList>> {
if self.cache_disabled {
return snapshot
.load_manifest_list(&self.file_io, table_metadata)
.await
.map(Arc::new);
return ManifestListReader::new(
snapshot.clone(),
self.file_io.clone(),
table_metadata.clone(),
)
.load()
.await
.map(Arc::new);
}

let key = CachedObjectKey::ManifestList((
Expand Down Expand Up @@ -173,9 +178,13 @@ impl ObjectCache {
snapshot: &SnapshotRef,
table_metadata: &TableMetadataRef,
) -> Result<CachedItem> {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, table_metadata)
.await?;
let manifest_list = ManifestListReader::new(
snapshot.clone(),
self.file_io.clone(),
table_metadata.clone(),
)
.load()
.await?;

Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
}
Expand Down
37 changes: 36 additions & 1 deletion crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use serde_derive::{Deserialize, Serialize};

use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2};
use self::_serde::{ManifestFileV1, ManifestFileV2};
use super::{FormatVersion, Manifest};
use super::{FormatVersion, Manifest, SnapshotRef, TableMetadataRef};
use crate::error::Result;
use crate::io::{FileIO, OutputFile};
use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
Expand Down Expand Up @@ -90,6 +90,41 @@ impl ManifestList {
}
}

/// A manifest list reader that encapsulates the logic for loading and parsing a [`ManifestList`]
/// from a snapshot.
pub struct ManifestListReader {
snapshot: SnapshotRef,
file_io: FileIO,
table_metadata: TableMetadataRef,
}

impl ManifestListReader {
pub(crate) fn new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can limit the visitilibyt to table module?

snapshot: SnapshotRef,
file_io: FileIO,
table_metadata: TableMetadataRef,
) -> Self {
Self {
snapshot,
file_io,
table_metadata,
}
}

/// Loads and returns the [`ManifestList`] for this snapshot.
pub async fn load(&self) -> Result<ManifestList> {
let manifest_list_content = self
.file_io
.new_input(self.snapshot.manifest_list())?
.read()
.await?;
ManifestList::parse_with_version(
&manifest_list_content,
self.table_metadata.format_version(),
)
}
}

/// A manifest list writer.
pub struct ManifestListWriter {
format_version: FormatVersion,
Expand Down
18 changes: 1 addition & 17 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use typed_builder::TypedBuilder;

use super::table_metadata::SnapshotLog;
use crate::error::{Result, timestamp_ms_to_utc};
use crate::io::FileIO;
use crate::spec::{ManifestList, SchemaId, SchemaRef, TableMetadata};
use crate::spec::{SchemaId, SchemaRef, TableMetadata};
use crate::{Error, ErrorKind};

/// The ref name of the main branch of the table.
Expand Down Expand Up @@ -194,21 +193,6 @@ impl Snapshot {
}
}

/// Load manifest list.
pub async fn load_manifest_list(
&self,
file_io: &FileIO,
table_metadata: &TableMetadata,
) -> Result<ManifestList> {
let manifest_list_content = file_io.new_input(&self.manifest_list)?.read().await?;
ManifestList::parse_with_version(
&manifest_list_content,
// TODO: You don't really need the version since you could just project any Avro in
// the version that you'd like to get (probably always the latest)
table_metadata.format_version(),
)
}

#[allow(dead_code)]
pub(crate) fn log(&self) -> SnapshotLog {
SnapshotLog {
Expand Down
11 changes: 10 additions & 1 deletion crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::io::FileIO;
use crate::io::object_cache::ObjectCache;
use crate::runtime::Runtime;
use crate::scan::TableScanBuilder;
use crate::spec::{SchemaRef, TableMetadata, TableMetadataRef};
use crate::spec::{ManifestListReader, SchemaRef, SnapshotRef, TableMetadata, TableMetadataRef};
use crate::{Error, ErrorKind, Result, TableIdent};

/// Builder to create table scan.
Expand Down Expand Up @@ -264,6 +264,15 @@ impl Table {
self.metadata.current_schema().clone()
}

/// Creates a [`ManifestListReader`] for the given snapshot.
pub fn manifest_list_reader(&self, snapshot: &SnapshotRef) -> ManifestListReader {
ManifestListReader::new(
snapshot.clone(),
self.file_io.clone(),
self.metadata.clone(),
)
}

/// Create a reader for the table.
pub fn reader_builder(&self) -> ArrowReaderBuilder {
ArrowReaderBuilder::new(self.file_io.clone(), self.runtime().clone())
Expand Down
30 changes: 16 additions & 14 deletions crates/iceberg/src/transaction/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,10 @@ impl SnapshotProduceOperation for FastAppendOperation {
return Ok(vec![]);
};

let manifest_list = snapshot
.load_manifest_list(
snapshot_produce.table.file_io(),
&snapshot_produce.table.metadata_ref(),
)
let manifest_list = snapshot_produce
.table
.manifest_list_reader(snapshot)
.load()
.await?;

Ok(manifest_list
Expand Down Expand Up @@ -162,7 +161,8 @@ mod tests {
use crate::io::FileIO;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, MAIN_BRANCH, ManifestEntry,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, Struct, TableMetadata,
ManifestListWriter, ManifestStatus, ManifestWriterBuilder, SnapshotRef, Struct,
TableMetadata,
};
use crate::table::Table;
use crate::test_utils::test_runtime;
Expand Down Expand Up @@ -338,14 +338,15 @@ mod tests {
let mut action_commit = Arc::new(action).commit(&table).await.unwrap();
let updates = action_commit.take_updates();

let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
let new_snapshot: SnapshotRef = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
SnapshotRef::new(snapshot.clone())
} else {
unreachable!("first update of a fast append should be AddSnapshot")
};

let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
let manifest_list = table
.manifest_list_reader(&new_snapshot)
.load()
.await
.unwrap();

Expand Down Expand Up @@ -502,13 +503,14 @@ mod tests {
);

// check manifest list
let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
snapshot
let new_snapshot: SnapshotRef = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] {
SnapshotRef::new(snapshot.clone())
} else {
unreachable!()
};
let manifest_list = new_snapshot
.load_manifest_list(table.file_io(), table.metadata())
let manifest_list = table
.manifest_list_reader(&new_snapshot)
.load()
.await
.unwrap();
assert_eq!(1, manifest_list.entries().len());
Expand Down
17 changes: 3 additions & 14 deletions crates/iceberg/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -605,13 +605,8 @@ mod test_row_lineage {
assert_eq!(table.metadata().next_row_id(), 30);

// Check written manifest for first_row_id
let manifest_list = table
.metadata()
.current_snapshot()
.unwrap()
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
let snapshot = table.metadata().current_snapshot().unwrap();
let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap();

assert_eq!(manifest_list.entries().len(), 1);
let manifest_file = &manifest_list.entries()[0];
Expand All @@ -633,13 +628,7 @@ mod test_row_lineage {
assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);

// Check written manifest for first_row_id
let manifest_list = table
.metadata()
.current_snapshot()
.unwrap()
.load_manifest_list(table.file_io(), table.metadata())
.await
.unwrap();
let manifest_list = table.manifest_list_reader(snapshot).load().await.unwrap();
assert_eq!(manifest_list.entries().len(), 2);
let manifest_file = &manifest_list.entries()[1];
assert_eq!(manifest_file.first_row_id, Some(30));
Expand Down
Loading
Loading