Skip to content

Increase the throughput of the validate_duplicate_files#2296

Open
tomighita wants to merge 19 commits into
apache:mainfrom
dbt-labs:tomighita/increase-duplicate-check-throughput
Open

Increase the throughput of the validate_duplicate_files#2296
tomighita wants to merge 19 commits into
apache:mainfrom
dbt-labs:tomighita/increase-duplicate-check-throughput

Conversation

@tomighita
Copy link
Copy Markdown
Contributor

@tomighita tomighita commented Mar 30, 2026

Which issue does this PR close?

What changes are included in this PR?

Increase the throughput of the validate_duplicate_files by starting all requests and polling rather than sequentially fetching each file.

Are these changes tested?

No need to add extra tests since the functionality should be equivalent and existing tests should capture this behaviour

@tomighita tomighita marked this pull request as ready for review March 30, 2026 12:22
.entries()
.iter()
.map(|entry| entry.load_manifest(file_io))
.collect();
Copy link
Copy Markdown
Collaborator

@CTTY CTTY Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we buffer_unordered here? This is an IO operation and too many requests may overwhelm the storage backend

.try_buffer_unordered(32) should make the most object stores happy

@tomighita
Copy link
Copy Markdown
Contributor Author

Thanks for your suggestion @CTTY! I have also incorporated @liurenjie1024's feedback from slack but I am a bit concerned about allocating threads without being explicit. For instance, in other places, we explicitly set the number of threads when setting the thread count [ref].

Any thoughts?

@tomighita tomighita requested a review from CTTY April 1, 2026 10:28
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 2, 2026

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions.

@github-actions github-actions Bot added the stale label May 2, 2026
@tomighita
Copy link
Copy Markdown
Contributor Author

Can anyone take a look over this, so we can get this merged?

let file_io = self.table.file_io().clone();
spawn(async move { entry.load_manifest(&file_io).await })
})
.buffer_unordered(32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should 32 be a shared constant someplace?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on making this a constant

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Moved. If you don't love the name, feel free to suggest a new one 😅

@github-actions github-actions Bot removed the stale label May 5, 2026
@tomighita tomighita requested a review from emkornfield May 5, 2026 07:15
Copy link
Copy Markdown
Contributor

@emkornfield emkornfield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor question is I don't know if the constant should be more centralized (e.g. do we want to limit all file operations to the same parallelism) but it could potentially be refactoring later.

use crate::{Error, ErrorKind, TableRequirement, TableUpdate};

const META_ROOT_PATH: &str = "metadata";
const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a doc suggesting why it is 32.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@tomighita
Copy link
Copy Markdown
Contributor Author

tomighita commented May 5, 2026

Ty for the review! 🥳

One minor question is I don't know if the constant should be more centralized (e.g. do we want to limit all file operations to the same parallelism) but it could potentially be refactoring later

@emkornfield I like the idea to centralise but I am afraid here this does not translate well to other operations. I would be in favour of refactoring this later and re-using it where needed, if needed.

futures::stream::iter(entries)
.map(|entry| {
let file_io = self.table.file_io().clone();
spawn(async move { entry.load_manifest(&file_io).await })
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the newly added Runtime api rather than raw tokio call.

let file_io = self.table.file_io().clone();
spawn(async move { entry.load_manifest(&file_io).await })
})
.buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will not need this if we use the Runtime api.

spawn(async move { entry.load_manifest(&file_io).await })
})
.buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES)
.try_for_each(|manifest| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not rust idiomatic. You could simply do as following:

let referenced_files = streams.flat_map(_.entries().map(_.file_path).filter(...).collect();

@tomighita tomighita requested a review from blackmwk May 19, 2026 13:31
})
.collect::<Vec<_>>(),
)
.then(|handle| async move { handle.await? })
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will resolve each handle in the stream order. Since we don't really care about the order of results here. try_join_all will be better. it will also be more readable

for the filtering logic we can still use a for-loop since it's not really the bottleneck here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this actually limited the concurrency.

.io()
.spawn(async move { entry.load_manifest(&file_io).await })
})
.collect::<Vec<_>>(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to collect to vec here?

})
.collect::<Vec<_>>(),
)
.then(|handle| async move { handle.await? })
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this actually limited the concurrency.

.collect::<Vec<_>>(),
)
.then(|handle| async move { handle.await? })
.try_fold(Vec::new(), |mut acc, manifest| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessarily complicated. The whole thing could be simplified as:

stream.flat_map_unordered().filter().collect()

tomighita and others added 4 commits May 22, 2026 13:58
Replaces the stream + then + try_fold chain with try_join_all and a
plain for-loop. then resolved handles in stream order which limited
concurrency; try_join_all lets all spawned manifest loads complete in
parallel. The filtering loop runs in microseconds and isn't a hot path,
so the combinator chain wasn't earning its complexity.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each future now returns the filtered file paths from its own manifest;
try_join_all yields Vec<Vec<String>> which is flattened. Avoids
materializing all Manifests before filtering.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@tomighita
Copy link
Copy Markdown
Contributor Author

Sorry folks, was a bit caught up with some other stuff and could not address this. I have changed it to group
into a FuturesUnordered but not sure how to re-write the try_fold bits as @blackmwk suggested... 😓

@tomighita tomighita requested review from CTTY and blackmwk June 2, 2026 13:56
Copy link
Copy Markdown
Collaborator

@CTTY CTTY left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more of something like the following to make it slightly more readable

let manifests: Vec<Manifest> = manifest_list
    .consume_entries()
    .into_iter()
    .map(|entry| {
        let file_io = file_io.clone();
        runtime
            .io()
            .spawn(..)
    })
    .collect::<FuturesUnordered<_>>()
    .await?;

let referenced_files: Vec<String> = manifests
    .iter()
    .flat_map(|m| m.entries())
    .filter(....)
    .collect();

But the existing code looks good to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Improve throughput of validate_duplicate_files

4 participants