diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f643a7d1e..50a1b22f44 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,6 +19,8 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; +use futures::TryStreamExt; +use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; @@ -164,29 +166,44 @@ impl<'a> SnapshotProducer<'a> { } pub(crate) async fn validate_duplicate_files(&self) -> Result<()> { + let Some(current_snapshot) = self.table.metadata().current_snapshot() else { + return Ok(()); + }; + let new_files: HashSet<&str> = self .added_data_files .iter() .map(|df| df.file_path.as_str()) .collect(); - let mut referenced_files = Vec::new(); - if let Some(current_snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) - .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry - .load_manifest(self.table.file_io()) - .await?; - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); - } - } - } - } + let runtime = self.table.runtime(); + let file_io = self.table.file_io(); + let manifest_list = current_snapshot + .load_manifest_list(file_io, &self.table.metadata_ref()) + .await?; + + let new_files_ref = &new_files; + let referenced_files: Vec = manifest_list + .consume_entries() + .into_iter() + .map(|entry| { + let file_io = file_io.clone(); + runtime + .io() + .spawn(async move { entry.load_manifest(&file_io).await }) + }) + .collect::>() + .try_fold(Vec::new(), |mut acc, manifest| async move { + acc.extend( + manifest? + .entries() + .iter() + .filter(|e| new_files_ref.contains(e.file_path()) && e.is_alive()) + .map(|e| e.file_path().to_string()), + ); + Ok(acc) + }) + .await?; if !referenced_files.is_empty() { return Err(Error::new(