Increase the throughput of the validate_duplicate_files#2296
Increase the throughput of the validate_duplicate_files#2296tomighita wants to merge 19 commits into
validate_duplicate_files#2296Conversation
| .entries() | ||
| .iter() | ||
| .map(|entry| entry.load_manifest(file_io)) | ||
| .collect(); |
There was a problem hiding this comment.
Should we buffer_unordered here? This is an IO operation and too many requests may overwhelm the storage backend
.try_buffer_unordered(32) should make the most object stores happy
|
Thanks for your suggestion @CTTY! I have also incorporated @liurenjie1024's feedback from slack but I am a bit concerned about allocating threads without being explicit. For instance, in other places, we explicitly set the number of threads when setting the thread count [ref]. Any thoughts? |
|
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@iceberg.apache.org list. Thank you for your contributions. |
|
Can anyone take a look over this, so we can get this merged? |
| let file_io = self.table.file_io().clone(); | ||
| spawn(async move { entry.load_manifest(&file_io).await }) | ||
| }) | ||
| .buffer_unordered(32) |
There was a problem hiding this comment.
Should 32 be a shared constant someplace?
There was a problem hiding this comment.
Good point. Moved. If you don't love the name, feel free to suggest a new one 😅
emkornfield
left a comment
There was a problem hiding this comment.
One minor question is I don't know if the constant should be more centralized (e.g. do we want to limit all file operations to the same parallelism) but it could potentially be refactoring later.
| use crate::{Error, ErrorKind, TableRequirement, TableUpdate}; | ||
|
|
||
| const META_ROOT_PATH: &str = "metadata"; | ||
| const NUM_THREADS_VALIDATE_DUPLICATE_FILES: usize = 32; |
There was a problem hiding this comment.
Maybe a doc suggesting why it is 32.
|
Ty for the review! 🥳
@emkornfield I like the idea to centralise but I am afraid here this does not translate well to other operations. I would be in favour of refactoring this later and re-using it where needed, if needed. |
| futures::stream::iter(entries) | ||
| .map(|entry| { | ||
| let file_io = self.table.file_io().clone(); | ||
| spawn(async move { entry.load_manifest(&file_io).await }) |
There was a problem hiding this comment.
We should use the newly added Runtime api rather than raw tokio call.
| let file_io = self.table.file_io().clone(); | ||
| spawn(async move { entry.load_manifest(&file_io).await }) | ||
| }) | ||
| .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES) |
There was a problem hiding this comment.
We will not need this if we use the Runtime api.
| spawn(async move { entry.load_manifest(&file_io).await }) | ||
| }) | ||
| .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES) | ||
| .try_for_each(|manifest| { |
There was a problem hiding this comment.
This is not rust idiomatic. You could simply do as following:
let referenced_files = streams.flat_map(_.entries().map(_.file_path).filter(...).collect();| }) | ||
| .collect::<Vec<_>>(), | ||
| ) | ||
| .then(|handle| async move { handle.await? }) |
There was a problem hiding this comment.
I think this will resolve each handle in the stream order. Since we don't really care about the order of results here. try_join_all will be better. it will also be more readable
for the filtering logic we can still use a for-loop since it's not really the bottleneck here
There was a problem hiding this comment.
+1, this actually limited the concurrency.
| .io() | ||
| .spawn(async move { entry.load_manifest(&file_io).await }) | ||
| }) | ||
| .collect::<Vec<_>>(), |
There was a problem hiding this comment.
Do we really need to collect to vec here?
| }) | ||
| .collect::<Vec<_>>(), | ||
| ) | ||
| .then(|handle| async move { handle.await? }) |
There was a problem hiding this comment.
+1, this actually limited the concurrency.
| .collect::<Vec<_>>(), | ||
| ) | ||
| .then(|handle| async move { handle.await? }) | ||
| .try_fold(Vec::new(), |mut acc, manifest| { |
There was a problem hiding this comment.
This is unnecessarily complicated. The whole thing could be simplified as:
stream.flat_map_unordered().filter().collect()
Replaces the stream + then + try_fold chain with try_join_all and a plain for-loop. then resolved handles in stream order which limited concurrency; try_join_all lets all spawned manifest loads complete in parallel. The filtering loop runs in microseconds and isn't a hot path, so the combinator chain wasn't earning its complexity. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Each future now returns the filtered file paths from its own manifest; try_join_all yields Vec<Vec<String>> which is flattened. Avoids materializing all Manifests before filtering. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Sorry folks, was a bit caught up with some other stuff and could not address this. I have changed it to group |
CTTY
left a comment
There was a problem hiding this comment.
I was thinking more of something like the following to make it slightly more readable
let manifests: Vec<Manifest> = manifest_list
.consume_entries()
.into_iter()
.map(|entry| {
let file_io = file_io.clone();
runtime
.io()
.spawn(..)
})
.collect::<FuturesUnordered<_>>()
.await?;
let referenced_files: Vec<String> = manifests
.iter()
.flat_map(|m| m.entries())
.filter(....)
.collect();
But the existing code looks good to me!
Which issue does this PR close?
What changes are included in this PR?
Increase the throughput of the
validate_duplicate_filesby starting all requests and polling rather than sequentially fetching each file.Are these changes tested?
No need to add extra tests since the functionality should be equivalent and existing tests should capture this behaviour