From 1d10f6b855f42478d20b76ad34c3a27e0a0f58d8 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Mon, 30 Mar 2026 14:31:15 +0300 Subject: [PATCH 01/11] Implement futures unordered when checking --- crates/iceberg/src/transaction/snapshot.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..8c6c5f1501 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,6 +19,8 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; +use futures::TryStreamExt; +use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; @@ -175,10 +177,15 @@ impl<'a> SnapshotProducer<'a> { let manifest_list = current_snapshot .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - for manifest_list_entry in manifest_list.entries() { - let manifest = manifest_list_entry - .load_manifest(self.table.file_io()) - .await?; + + let file_io = self.table.file_io(); + let mut manifest_futures: FuturesUnordered<_> = manifest_list + .entries() + .iter() + .map(|entry| entry.load_manifest(file_io)) + .collect(); + + while let Some(manifest) = manifest_futures.try_next().await? { for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() { From 65da6db877da228882f9936761d70d6f927e615e Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 31 Mar 2026 13:52:42 +0300 Subject: [PATCH 02/11] Change to task per fetch --- crates/iceberg/src/transaction/snapshot.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8c6c5f1501..7ff421a3e2 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -24,6 +24,7 @@ use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; +use crate::runtime::spawn; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, @@ -178,14 +179,16 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - let file_io = self.table.file_io(); - let mut manifest_futures: FuturesUnordered<_> = manifest_list - .entries() - .iter() - .map(|entry| entry.load_manifest(file_io)) + let mut manifest_tasks: FuturesUnordered<_> = manifest_list + .consume_entries() + .into_iter() + .map(|entry| { + let file_io = self.table.file_io().clone(); + spawn(async move { entry.load_manifest(&file_io).await }) + }) .collect(); - while let Some(manifest) = manifest_futures.try_next().await? { + while let Some(manifest) = manifest_tasks.try_next().await? { for entry in manifest.entries() { let file_path = entry.file_path(); if new_files.contains(file_path) && entry.is_alive() { From b9d544fc2bce9b0a036d530b32a66c03e6434ffc Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Wed, 1 Apr 2026 11:31:07 +0300 Subject: [PATCH 03/11] Move to buffered and threads --- crates/iceberg/src/transaction/snapshot.rs | 27 +++++++++++----------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 7ff421a3e2..988bbf3ebc 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,8 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; -use futures::TryStreamExt; -use futures::stream::FuturesUnordered; +use futures::{StreamExt, TryStreamExt}; use uuid::Uuid; use crate::error::Result; @@ -179,23 +178,23 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) .await?; - let mut manifest_tasks: FuturesUnordered<_> = manifest_list - .consume_entries() - .into_iter() + let entries: Vec<_> = manifest_list.consume_entries().into_iter().collect(); + futures::stream::iter(entries) .map(|entry| { let file_io = self.table.file_io().clone(); spawn(async move { entry.load_manifest(&file_io).await }) }) - .collect(); - - while let Some(manifest) = manifest_tasks.try_next().await? { - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); + .buffer_unordered(32) + .try_for_each(|manifest| { + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } } - } - } + std::future::ready(Ok(())) + }) + .await?; } if !referenced_files.is_empty() { From 5a91720bef5f17d9d4265c087b8ba4ced261737f Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 5 May 2026 10:09:44 +0300 Subject: [PATCH 04/11] Move num threads to constant --- crates/iceberg/src/transaction/snapshot.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 988bbf3ebc..f4c2f5a9f7 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -35,6 +35,7 @@ use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; +const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32; /// A trait that defines how different table operations produce new snapshots. /// @@ -184,7 +185,7 @@ impl<'a> SnapshotProducer<'a> { let file_io = self.table.file_io().clone(); spawn(async move { entry.load_manifest(&file_io).await }) }) - .buffer_unordered(32) + .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES) .try_for_each(|manifest| { for entry in manifest.entries() { let file_path = entry.file_path(); From 84f65d2fb973669c32b3895bd5495adef708d144 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 5 May 2026 14:44:52 +0300 Subject: [PATCH 05/11] Add docstring to explain the motivation of the variable and its value --- crates/iceberg/src/transaction/snapshot.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index f4c2f5a9f7..6a0feb8aac 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -35,6 +35,8 @@ use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; +/// Control the number of threads used to verify duplicate files. +/// This needs to balance the degree of parallelism and resource utilisation. const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32; /// A trait that defines how different table operations produce new snapshots. From da2e74290b2e11962bc52b4a1a2d7176bb5ac5bb Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 12 May 2026 10:58:42 +0300 Subject: [PATCH 06/11] Trigger Build From bb2119e202285f795d4f722ba4e264558cab5c88 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 19 May 2026 16:31:34 +0300 Subject: [PATCH 07/11] Use the new runtime api and spawn all tasks at once --- crates/iceberg/src/transaction/snapshot.rs | 68 +++++++++++++--------- 1 file changed, 39 insertions(+), 29 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 8f43499692..76f52e080b 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,11 +19,11 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; -use futures::{StreamExt, TryStreamExt}; +use futures::future::OptionFuture; +use futures::{StreamExt, TryFutureExt, TryStreamExt}; use uuid::Uuid; use crate::error::Result; -use crate::runtime::spawn; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, MAIN_BRANCH, ManifestContentType, ManifestEntry, ManifestFile, ManifestListWriter, ManifestWriter, ManifestWriterBuilder, Operation, Snapshot, @@ -35,9 +35,6 @@ use crate::transaction::ActionCommit; use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; const META_ROOT_PATH: &str = "metadata"; -/// Control the number of threads used to verify duplicate files. -/// This needs to balance the degree of parallelism and resource utilisation. -const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32; /// A trait that defines how different table operations produce new snapshots. /// @@ -175,30 +172,43 @@ impl<'a> SnapshotProducer<'a> { .map(|df| df.file_path.as_str()) .collect(); - let mut referenced_files = Vec::new(); - if let Some(current_snapshot) = self.table.metadata().current_snapshot() { - let manifest_list = current_snapshot - .load_manifest_list(self.table.file_io(), &self.table.metadata_ref()) - .await?; - - let entries: Vec<_> = manifest_list.consume_entries().into_iter().collect(); - futures::stream::iter(entries) - .map(|entry| { - let file_io = self.table.file_io().clone(); - spawn(async move { entry.load_manifest(&file_io).await }) - }) - .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES) - .try_for_each(|manifest| { - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); - } - } - std::future::ready(Ok(())) - }) - .await?; - } + let runtime = self.table.runtime().clone(); + let file_io = self.table.file_io(); + let metadata_ref = self.table.metadata_ref(); + + let referenced_files: Vec = + OptionFuture::from(self.table.metadata().current_snapshot().map(|snapshot| { + snapshot + .load_manifest_list(&file_io, &metadata_ref) + .and_then(|manifest_list| { + futures::stream::iter( + manifest_list + .consume_entries() + .into_iter() + .map(|entry| { + let file_io = file_io.clone(); + runtime + .io() + .spawn(async move { entry.load_manifest(&file_io).await }) + }) + .collect::>(), + ) + .then(|handle| async move { handle.await? }) + .try_fold(Vec::new(), |mut acc, manifest| { + acc.extend( + manifest + .entries() + .iter() + .filter(|e| new_files.contains(e.file_path()) && e.is_alive()) + .map(|e| e.file_path().to_string()), + ); + std::future::ready(Ok(acc)) + }) + }) + })) + .await + .transpose()? + .unwrap_or_default(); if !referenced_files.is_empty() { return Err(Error::new( From b202048622a3c2467c9be3fc154a7954fb977645 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Wed, 20 May 2026 11:07:21 +0300 Subject: [PATCH 08/11] fix clippy --- crates/iceberg/src/transaction/snapshot.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 76f52e080b..9aad997de1 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -179,7 +179,7 @@ impl<'a> SnapshotProducer<'a> { let referenced_files: Vec = OptionFuture::from(self.table.metadata().current_snapshot().map(|snapshot| { snapshot - .load_manifest_list(&file_io, &metadata_ref) + .load_manifest_list(file_io, &metadata_ref) .and_then(|manifest_list| { futures::stream::iter( manifest_list From 0ea86b0a311e74c76da9b67cca403e57bc6d4ff8 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Fri, 22 May 2026 13:58:31 +0300 Subject: [PATCH 09/11] Use try_join_all and simple loop per PR feedback Replaces the stream + then + try_fold chain with try_join_all and a plain for-loop. then resolved handles in stream order which limited concurrency; try_join_all lets all spawned manifest loads complete in parallel. The filtering loop runs in microseconds and isn't a hot path, so the combinator chain wasn't earning its complexity. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/iceberg/src/transaction/snapshot.rs | 68 ++++++++++------------ 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 9aad997de1..e97bfd0989 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,8 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; -use futures::future::OptionFuture; -use futures::{StreamExt, TryFutureExt, TryStreamExt}; +use futures::future::try_join_all; use uuid::Uuid; use crate::error::Result; @@ -166,49 +165,42 @@ impl<'a> SnapshotProducer<'a> { } pub(crate) async fn validate_duplicate_files(&self) -> Result<()> { + let Some(current_snapshot) = self.table.metadata().current_snapshot() else { + return Ok(()); + }; + let new_files: HashSet<&str> = self .added_data_files .iter() .map(|df| df.file_path.as_str()) .collect(); - let runtime = self.table.runtime().clone(); + let runtime = self.table.runtime(); let file_io = self.table.file_io(); - let metadata_ref = self.table.metadata_ref(); - - let referenced_files: Vec = - OptionFuture::from(self.table.metadata().current_snapshot().map(|snapshot| { - snapshot - .load_manifest_list(file_io, &metadata_ref) - .and_then(|manifest_list| { - futures::stream::iter( - manifest_list - .consume_entries() - .into_iter() - .map(|entry| { - let file_io = file_io.clone(); - runtime - .io() - .spawn(async move { entry.load_manifest(&file_io).await }) - }) - .collect::>(), - ) - .then(|handle| async move { handle.await? }) - .try_fold(Vec::new(), |mut acc, manifest| { - acc.extend( - manifest - .entries() - .iter() - .filter(|e| new_files.contains(e.file_path()) && e.is_alive()) - .map(|e| e.file_path().to_string()), - ); - std::future::ready(Ok(acc)) - }) - }) - })) - .await - .transpose()? - .unwrap_or_default(); + let manifest_list = current_snapshot + .load_manifest_list(file_io, &self.table.metadata_ref()) + .await?; + + let manifests = try_join_all(manifest_list.consume_entries().into_iter().map(|entry| { + let file_io = file_io.clone(); + async move { + runtime + .io() + .spawn(async move { entry.load_manifest(&file_io).await }) + .await? + } + })) + .await?; + + let mut referenced_files = Vec::new(); + for manifest in &manifests { + for entry in manifest.entries() { + let file_path = entry.file_path(); + if new_files.contains(file_path) && entry.is_alive() { + referenced_files.push(file_path.to_string()); + } + } + } if !referenced_files.is_empty() { return Err(Error::new( From 5792668f1ce6a1303c677476b51030884dbaaabc Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Fri, 22 May 2026 14:13:05 +0300 Subject: [PATCH 10/11] Fuse filter into manifest-load iteration Each future now returns the filtered file paths from its own manifest; try_join_all yields Vec> which is flattened. Avoids materializing all Manifests before filtering. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/iceberg/src/transaction/snapshot.rs | 41 ++++++++++++---------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index e97bfd0989..5b7bd790e6 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -181,26 +181,29 @@ impl<'a> SnapshotProducer<'a> { .load_manifest_list(file_io, &self.table.metadata_ref()) .await?; - let manifests = try_join_all(manifest_list.consume_entries().into_iter().map(|entry| { - let file_io = file_io.clone(); - async move { - runtime - .io() - .spawn(async move { entry.load_manifest(&file_io).await }) - .await? - } - })) - .await?; - - let mut referenced_files = Vec::new(); - for manifest in &manifests { - for entry in manifest.entries() { - let file_path = entry.file_path(); - if new_files.contains(file_path) && entry.is_alive() { - referenced_files.push(file_path.to_string()); + let new_files_ref = &new_files; + let referenced_files: Vec = + try_join_all(manifest_list.consume_entries().into_iter().map(|entry| { + let file_io = file_io.clone(); + async move { + let manifest = runtime + .io() + .spawn(async move { entry.load_manifest(&file_io).await }) + .await??; + Ok::<_, Error>( + manifest + .entries() + .iter() + .filter(|e| new_files_ref.contains(e.file_path()) && e.is_alive()) + .map(|e| e.file_path().to_string()) + .collect::>(), + ) } - } - } + })) + .await? + .into_iter() + .flatten() + .collect(); if !referenced_files.is_empty() { return Err(Error::new( From 69e425c42001ccd61f86c2a272a206d075a8e2a6 Mon Sep 17 00:00:00 2001 From: Victor Ghita Date: Tue, 2 Jun 2026 16:54:29 +0300 Subject: [PATCH 11/11] Increase throughput with try_concat --- crates/iceberg/src/transaction/snapshot.rs | 44 +++++++++++----------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 5b7bd790e6..50a1b22f44 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -19,7 +19,8 @@ use std::collections::{HashMap, HashSet}; use std::future::Future; use std::ops::RangeFrom; -use futures::future::try_join_all; +use futures::TryStreamExt; +use futures::stream::FuturesUnordered; use uuid::Uuid; use crate::error::Result; @@ -182,28 +183,27 @@ impl<'a> SnapshotProducer<'a> { .await?; let new_files_ref = &new_files; - let referenced_files: Vec = - try_join_all(manifest_list.consume_entries().into_iter().map(|entry| { - let file_io = file_io.clone(); - async move { - let manifest = runtime - .io() - .spawn(async move { entry.load_manifest(&file_io).await }) - .await??; - Ok::<_, Error>( - manifest - .entries() - .iter() - .filter(|e| new_files_ref.contains(e.file_path()) && e.is_alive()) - .map(|e| e.file_path().to_string()) - .collect::>(), - ) - } - })) - .await? + let referenced_files: Vec = manifest_list + .consume_entries() .into_iter() - .flatten() - .collect(); + .map(|entry| { + let file_io = file_io.clone(); + runtime + .io() + .spawn(async move { entry.load_manifest(&file_io).await }) + }) + .collect::>() + .try_fold(Vec::new(), |mut acc, manifest| async move { + acc.extend( + manifest? + .entries() + .iter() + .filter(|e| new_files_ref.contains(e.file_path()) && e.is_alive()) + .map(|e| e.file_path().to_string()), + ); + Ok(acc) + }) + .await?; if !referenced_files.is_empty() { return Err(Error::new(