Skip to content
Open
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1d10f6b
Implement futures unordered when checking
tomighita Mar 30, 2026
96777d5
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita Mar 30, 2026
65da6db
Change to task per fetch
tomighita Mar 31, 2026
b9d544f
Move to buffered and threads
tomighita Apr 1, 2026
9319652
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita May 4, 2026
5a91720
Move num threads to constant
tomighita May 5, 2026
3c2da2c
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita May 5, 2026
84f65d2
Add docstring to explain the motivation of the variable and its value
tomighita May 5, 2026
ec66e87
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita May 12, 2026
da2e742
Trigger Build
tomighita May 12, 2026
c0fc03f
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita May 12, 2026
dd72bb9
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita May 19, 2026
bb2119e
Use the new runtime api and spawn all tasks at once
tomighita May 19, 2026
4582374
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita May 20, 2026
b202048
fix clippy
tomighita May 20, 2026
0ea86b0
Use try_join_all and simple loop per PR feedback
tomighita May 22, 2026
5792668
Fuse filter into manifest-load iteration
tomighita May 22, 2026
69e425c
Increase throughput with try_concat
tomighita Jun 2, 2026
f994974
Merge branch 'main' into tomighita/increase-duplicate-check-throughput
tomighita Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions crates/iceberg/src/transaction/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::ops::RangeFrom;

use futures::TryStreamExt;
use futures::stream::FuturesUnordered;
use uuid::Uuid;

use crate::error::Result;
Expand Down Expand Up @@ -164,29 +166,44 @@ impl<'a> SnapshotProducer<'a> {
}

pub(crate) async fn validate_duplicate_files(&self) -> Result<()> {
let Some(current_snapshot) = self.table.metadata().current_snapshot() else {
return Ok(());
};

let new_files: HashSet<&str> = self
.added_data_files
.iter()
.map(|df| df.file_path.as_str())
.collect();

let mut referenced_files = Vec::new();
if let Some(current_snapshot) = self.table.metadata().current_snapshot() {
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(), &self.table.metadata_ref())
.await?;
for manifest_list_entry in manifest_list.entries() {
let manifest = manifest_list_entry
.load_manifest(self.table.file_io())
.await?;
for entry in manifest.entries() {
let file_path = entry.file_path();
if new_files.contains(file_path) && entry.is_alive() {
referenced_files.push(file_path.to_string());
}
}
}
}
let runtime = self.table.runtime();
let file_io = self.table.file_io();
let manifest_list = current_snapshot
.load_manifest_list(file_io, &self.table.metadata_ref())
.await?;

let new_files_ref = &new_files;
let referenced_files: Vec<String> = manifest_list
.consume_entries()
.into_iter()
.map(|entry| {
let file_io = file_io.clone();
runtime
.io()
.spawn(async move { entry.load_manifest(&file_io).await })
})
.collect::<FuturesUnordered<_>>()
.try_fold(Vec::new(), |mut acc, manifest| async move {
acc.extend(
manifest?
.entries()
.iter()
.filter(|e| new_files_ref.contains(e.file_path()) && e.is_alive())
.map(|e| e.file_path().to_string()),
);
Ok(acc)
})
.await?;

if !referenced_files.is_empty() {
return Err(Error::new(
Expand Down
Loading