Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,7 @@ impl Catalog for GlueCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,7 @@ impl Catalog for HmsCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ impl Catalog for SqlCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
50 changes: 50 additions & 0 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7506,6 +7506,55 @@ impl<U> bnum::cast::As for iceberg::spec::ManifestList
pub fn iceberg::spec::ManifestList::as_<T>(self) -> T where T: bnum::cast::CastFrom<U>
impl<V, T> ppv_lite86::types::VZip<V> for iceberg::spec::ManifestList where V: ppv_lite86::types::MultiLane<T>
pub fn iceberg::spec::ManifestList::vzip(self) -> V
pub struct iceberg::spec::ManifestListReader<'a>
impl<'a> iceberg::spec::ManifestListReader<'a>
pub async fn iceberg::spec::ManifestListReader<'a>::load(&self) -> iceberg::Result<iceberg::spec::ManifestList>
impl<'a> core::marker::Freeze for iceberg::spec::ManifestListReader<'a>
impl<'a> core::marker::Send for iceberg::spec::ManifestListReader<'a>
impl<'a> core::marker::Sync for iceberg::spec::ManifestListReader<'a>
impl<'a> core::marker::Unpin for iceberg::spec::ManifestListReader<'a>
impl<'a> !core::panic::unwind_safe::RefUnwindSafe for iceberg::spec::ManifestListReader<'a>
impl<'a> !core::panic::unwind_safe::UnwindSafe for iceberg::spec::ManifestListReader<'a>
impl<T, U> core::convert::Into<U> for iceberg::spec::ManifestListReader<'a> where U: core::convert::From<T>
pub fn iceberg::spec::ManifestListReader<'a>::into(self) -> U
impl<T, U> core::convert::TryFrom<U> for iceberg::spec::ManifestListReader<'a> where U: core::convert::Into<T>
pub type iceberg::spec::ManifestListReader<'a>::Error = core::convert::Infallible
pub fn iceberg::spec::ManifestListReader<'a>::try_from(value: U) -> core::result::Result<T, <T as core::convert::TryFrom<U>>::Error>
impl<T, U> core::convert::TryInto<U> for iceberg::spec::ManifestListReader<'a> where U: core::convert::TryFrom<T>
pub type iceberg::spec::ManifestListReader<'a>::Error = <U as core::convert::TryFrom<T>>::Error
pub fn iceberg::spec::ManifestListReader<'a>::try_into(self) -> core::result::Result<U, <U as core::convert::TryFrom<T>>::Error>
impl<T> as_any::AsAny for iceberg::spec::ManifestListReader<'a> where T: core::any::Any
pub fn iceberg::spec::ManifestListReader<'a>::as_any(&self) -> &(dyn core::any::Any + 'static)
pub fn iceberg::spec::ManifestListReader<'a>::as_any_mut(&mut self) -> &mut (dyn core::any::Any + 'static)
pub fn iceberg::spec::ManifestListReader<'a>::type_name(&self) -> &'static str
impl<T> as_any::Downcast for iceberg::spec::ManifestListReader<'a> where T: as_any::AsAny + ?core::marker::Sized
impl<T> core::any::Any for iceberg::spec::ManifestListReader<'a> where T: 'static + ?core::marker::Sized
pub fn iceberg::spec::ManifestListReader<'a>::type_id(&self) -> core::any::TypeId
impl<T> core::borrow::Borrow<T> for iceberg::spec::ManifestListReader<'a> where T: ?core::marker::Sized
pub fn iceberg::spec::ManifestListReader<'a>::borrow(&self) -> &T
impl<T> core::borrow::BorrowMut<T> for iceberg::spec::ManifestListReader<'a> where T: ?core::marker::Sized
pub fn iceberg::spec::ManifestListReader<'a>::borrow_mut(&mut self) -> &mut T
impl<T> core::convert::From<T> for iceberg::spec::ManifestListReader<'a>
pub fn iceberg::spec::ManifestListReader<'a>::from(t: T) -> T
impl<T> crossbeam_epoch::atomic::Pointable for iceberg::spec::ManifestListReader<'a>
pub type iceberg::spec::ManifestListReader<'a>::Init = T
pub const iceberg::spec::ManifestListReader<'a>::ALIGN: usize
pub unsafe fn iceberg::spec::ManifestListReader<'a>::deref<'a>(ptr: usize) -> &'a T
pub unsafe fn iceberg::spec::ManifestListReader<'a>::deref_mut<'a>(ptr: usize) -> &'a mut T
pub unsafe fn iceberg::spec::ManifestListReader<'a>::drop(ptr: usize)
pub unsafe fn iceberg::spec::ManifestListReader<'a>::init(init: <T as crossbeam_epoch::atomic::Pointable>::Init) -> usize
impl<T> either::into_either::IntoEither for iceberg::spec::ManifestListReader<'a>
impl<T> tower_http::follow_redirect::policy::PolicyExt for iceberg::spec::ManifestListReader<'a> where T: ?core::marker::Sized
pub fn iceberg::spec::ManifestListReader<'a>::and<P, B, E>(self, other: P) -> tower_http::follow_redirect::policy::and::And<T, P> where T: tower_http::follow_redirect::policy::Policy<B, E>, P: tower_http::follow_redirect::policy::Policy<B, E>
pub fn iceberg::spec::ManifestListReader<'a>::or<P, B, E>(self, other: P) -> tower_http::follow_redirect::policy::or::Or<T, P> where T: tower_http::follow_redirect::policy::Policy<B, E>, P: tower_http::follow_redirect::policy::Policy<B, E>
impl<T> tracing::instrument::Instrument for iceberg::spec::ManifestListReader<'a>
impl<T> tracing::instrument::WithSubscriber for iceberg::spec::ManifestListReader<'a>
impl<T> typenum::type_operators::Same for iceberg::spec::ManifestListReader<'a>
pub type iceberg::spec::ManifestListReader<'a>::Output = T
impl<U> bnum::cast::As for iceberg::spec::ManifestListReader<'a>
pub fn iceberg::spec::ManifestListReader<'a>::as_<T>(self) -> T where T: bnum::cast::CastFrom<U>
impl<V, T> ppv_lite86::types::VZip<V> for iceberg::spec::ManifestListReader<'a> where V: ppv_lite86::types::MultiLane<T>
pub fn iceberg::spec::ManifestListReader<'a>::vzip(self) -> V
pub struct iceberg::spec::ManifestListWriter
impl iceberg::spec::ManifestListWriter
pub fn iceberg::spec::ManifestListWriter::add_manifests(&mut self, manifests: impl core::iter::traits::iterator::Iterator<Item = iceberg::spec::ManifestFile>) -> iceberg::Result<()>
Expand Down Expand Up @@ -11004,6 +11053,7 @@ pub fn iceberg::table::Table::current_schema_ref(&self) -> iceberg::spec::Schema
pub fn iceberg::table::Table::file_io(&self) -> &iceberg::io::FileIO
pub fn iceberg::table::Table::identifier(&self) -> &iceberg::TableIdent
pub fn iceberg::table::Table::inspect(&self) -> iceberg::inspect::MetadataTable<'_>
pub fn iceberg::table::Table::manifest_list_reader<'a>(&'a self, snapshot: &'a iceberg::spec::Snapshot) -> iceberg::spec::ManifestListReader<'a>
pub fn iceberg::table::Table::metadata(&self) -> &iceberg::spec::TableMetadata
pub fn iceberg::table::Table::metadata_location(&self) -> core::option::Option<&str>
pub fn iceberg::table::Table::metadata_location_result(&self) -> iceberg::Result<&str>
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl Catalog for MemoryCatalog {
table_info.file_io(),
table_info.metadata(),
table_info.metadata_location(),
table_info.encryption_manager(),
)
.await
}
Expand Down
8 changes: 6 additions & 2 deletions crates/iceberg/src/catalog/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use std::collections::HashSet;
use futures::{TryStreamExt, stream};

use crate::Result;
use crate::encryption::EncryptionManager;
use crate::io::FileIO;
use crate::spec::TableMetadata;
use crate::spec::{ManifestListReader, TableMetadata};

const DELETE_CONCURRENCY: usize = 10;

Expand All @@ -40,14 +41,17 @@ pub async fn drop_table_data(
io: &FileIO,
metadata: &TableMetadata,
metadata_location: Option<&str>,
encryption_manager: Option<&EncryptionManager>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public api change here

) -> Result<()> {
let mut manifest_lists_to_delete: HashSet<String> = HashSet::new();
let mut manifests_to_delete: HashSet<String> = HashSet::new();

// Load all manifest lists concurrently
let results: Vec<_> =
futures::future::try_join_all(metadata.snapshots().map(|snapshot| async {
let manifest_list = snapshot.load_manifest_list(io, metadata).await?;
let manifest_list = ManifestListReader::new(snapshot, io, metadata, encryption_manager)
.load()
.await?;
Ok::<_, crate::Error>((snapshot.manifest_list().to_string(), manifest_list))
}))
.await?;
Expand Down
4 changes: 1 addition & 3 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,7 @@ impl<'a> ManifestsTable<'a> {
let mut partition_summaries = self.partition_summary_builder()?;

if let Some(snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
let manifest_list = self.table.manifest_list_reader(snapshot).load().await?;
for manifest in manifest_list.entries() {
content.append_value(manifest.content as i32);
path.append_value(manifest.manifest_path.clone());
Expand Down
52 changes: 37 additions & 15 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
use std::mem::size_of_val;
use std::sync::Arc;

use crate::encryption::EncryptionManager;
use crate::io::FileIO;
use crate::spec::{
FormatVersion, Manifest, ManifestFile, ManifestList, SchemaId, SnapshotRef, TableMetadataRef,
FormatVersion, Manifest, ManifestFile, ManifestList, ManifestListReader, SchemaId, SnapshotRef,
TableMetadataRef,
};
use crate::{Error, ErrorKind, Result};

Expand All @@ -44,20 +46,25 @@ pub struct ObjectCache {
cache: moka::future::Cache<CachedObjectKey, CachedItem>,
file_io: FileIO,
cache_disabled: bool,
encryption_manager: Option<Arc<EncryptionManager>>,
}

impl ObjectCache {
/// Creates a new [`ObjectCache`]
/// with the default cache size
pub(crate) fn new(file_io: FileIO) -> Self {
Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES)
pub(crate) fn new(file_io: FileIO, encryption_manager: Option<Arc<EncryptionManager>>) -> Self {
Self::new_with_capacity(file_io, DEFAULT_CACHE_SIZE_BYTES, encryption_manager)
}

/// Creates a new [`ObjectCache`]
/// with a specific cache size
pub(crate) fn new_with_capacity(file_io: FileIO, cache_size_bytes: u64) -> Self {
pub(crate) fn new_with_capacity(
file_io: FileIO,
cache_size_bytes: u64,
encryption_manager: Option<Arc<EncryptionManager>>,
) -> Self {
if cache_size_bytes == 0 {
Self::with_disabled_cache(file_io)
Self::with_disabled_cache(file_io, encryption_manager)
} else {
Self {
cache: moka::future::Cache::builder()
Expand All @@ -69,17 +76,22 @@ impl ObjectCache {
.build(),
file_io,
cache_disabled: false,
encryption_manager,
}
}
}

/// Creates a new [`ObjectCache`]
/// with caching disabled
pub(crate) fn with_disabled_cache(file_io: FileIO) -> Self {
pub(crate) fn with_disabled_cache(
file_io: FileIO,
encryption_manager: Option<Arc<EncryptionManager>>,
) -> Self {
Self {
cache: moka::future::Cache::new(0),
file_io,
cache_disabled: true,
encryption_manager,
}
}

Expand Down Expand Up @@ -126,10 +138,15 @@ impl ObjectCache {
table_metadata: &TableMetadataRef,
) -> Result<Arc<ManifestList>> {
if self.cache_disabled {
return snapshot
.load_manifest_list(&self.file_io, table_metadata)
.await
.map(Arc::new);
return ManifestListReader::new(
snapshot,
&self.file_io,
table_metadata,
self.encryption_manager.as_deref(),
)
.load()
.await
.map(Arc::new);
}

let key = CachedObjectKey::ManifestList((
Expand Down Expand Up @@ -173,9 +190,14 @@ impl ObjectCache {
snapshot: &SnapshotRef,
table_metadata: &TableMetadataRef,
) -> Result<CachedItem> {
let manifest_list = snapshot
.load_manifest_list(&self.file_io, table_metadata)
.await?;
let manifest_list = ManifestListReader::new(
snapshot,
&self.file_io,
table_metadata,
self.encryption_manager.as_deref(),
)
.load()
.await?;

Ok(CachedItem::ManifestList(Arc::new(manifest_list)))
}
Expand Down Expand Up @@ -319,7 +341,7 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone());
let object_cache = ObjectCache::with_disabled_cache(fixture.table.file_io().clone(), None);

let result_manifest_list = object_cache
.get_manifest_list(
Expand Down Expand Up @@ -352,7 +374,7 @@ mod tests {
let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

let object_cache = ObjectCache::new(fixture.table.file_io().clone());
let object_cache = ObjectCache::new(fixture.table.file_io().clone(), None);

// not in cache
let result_manifest_list = object_cache
Expand Down
58 changes: 57 additions & 1 deletion crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use serde_derive::{Deserialize, Serialize};

use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2};
use self::_serde::{ManifestFileV1, ManifestFileV2};
use super::{FormatVersion, Manifest};
use super::{FormatVersion, Manifest, Snapshot, TableMetadata};
use crate::encryption::{EncryptedInputFile, EncryptionManager};
use crate::error::Result;
use crate::io::{FileIO, OutputFile};
use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3;
Expand Down Expand Up @@ -90,6 +91,61 @@ impl ManifestList {
}
}

/// A manifest list reader that encapsulates the logic for loading and parsing a [`ManifestList`]
/// from a snapshot.
pub struct ManifestListReader<'a> {
snapshot: &'a Snapshot,
file_io: &'a FileIO,
table_metadata: &'a TableMetadata,
encryption_manager: Option<&'a EncryptionManager>,
}

impl<'a> ManifestListReader<'a> {
pub(crate) fn new(
snapshot: &'a Snapshot,
file_io: &'a FileIO,
table_metadata: &'a TableMetadata,
encryption_manager: Option<&'a EncryptionManager>,
) -> Self {
Self {
snapshot,
file_io,
table_metadata,
encryption_manager,
}
}

/// Loads and returns the [`ManifestList`] for this snapshot.
pub async fn load(&self) -> Result<ManifestList> {
let manifest_list_content = match (
self.snapshot.encryption_key_id(),
self.encryption_manager,
) {
(Some(_), None) => {
return Err(Error::new(
ErrorKind::PreconditionFailed,
"Snapshot has encryption_key_id but no EncryptionManager configured on Table",
));
}
(Some(key_id), Some(em)) => {
let key_metadata = em.decrypt_manifest_list_key_metadata(key_id).await?;
let input = self.file_io.new_input(self.snapshot.manifest_list())?;
EncryptedInputFile::new(input, key_metadata).read().await?
}
(None, _) => {
self.file_io
.new_input(self.snapshot.manifest_list())?
.read()
.await?
}
};
ManifestList::parse_with_version(
&manifest_list_content,
self.table_metadata.format_version(),
)
}
}

/// A manifest list writer.
pub struct ManifestListWriter {
format_version: FormatVersion,
Expand Down
Loading
Loading