From 36f1d40f20915cbd3803a7853d287b054bda496a Mon Sep 17 00:00:00 2001 From: Ping Yu Date: Mon, 27 Apr 2026 15:27:20 +0800 Subject: [PATCH] request: replace try_join_all with JoinSet in plan handlers Replace futures::future::try_join_all with tokio::task::JoinSet in single_plan_handler and multi_store_handler to join tasks as they complete rather than collecting all handles first. This provides better error handling with explicit JoinError propagation. Signed-off-by: Ping Yu --- src/request/plan.rs | 41 +++++++++++++++++++++++++++++++---------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/request/plan.rs b/src/request/plan.rs index 8bd15bb5..8c566b8a 100644 --- a/src/request/plan.rs +++ b/src/request/plan.rs @@ -5,9 +5,10 @@ use std::sync::Arc; use async_recursion::async_recursion; use async_trait::async_trait; -use futures::future::try_join_all; +use tokio::task::JoinSet; use futures::prelude::*; use log::debug; +use log::error; use log::info; use tokio::sync::Semaphore; use tokio::time::sleep; @@ -117,12 +118,13 @@ where preserve_region_results: bool, ) -> Result<::Result> { let shards = current_plan.shards(&pd_client).collect::>().await; - debug!("single_plan_handler, shards: {}", shards.len()); - let mut handles = Vec::with_capacity(shards.len()); + let shards_len = shards.len(); + debug!("single_plan_handler, shards: {}", shards_len); + let mut join_set = JoinSet::new(); for shard in shards { let (shard, region) = shard?; let clone = current_plan.clone_then_apply_shard(shard); - let handle = tokio::spawn(Self::single_shard_handler( + join_set.spawn(Self::single_shard_handler( pd_client.clone(), clone, region, @@ -130,10 +132,19 @@ where permits.clone(), preserve_region_results, )); - handles.push(handle); } - let results = try_join_all(handles).await?; + let mut results = Vec::with_capacity(shards_len); + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok(val) => results.push(val), + Err(e) => { + error!("failed to join task: {}", e); + return Err(Error::JoinError(e)); + } + } + } + if preserve_region_results { Ok(results .into_iter() @@ -449,18 +460,28 @@ where async fn execute(&self) -> Result { let concurrency_permits = Arc::new(Semaphore::new(MULTI_STORES_CONCURRENCY)); let stores = self.pd_client.clone().all_stores().await?; - let mut handles = Vec::with_capacity(stores.len()); + let stores_len = stores.len(); + let mut join_set = JoinSet::new(); for store in stores { let mut clone = self.inner.clone(); clone.apply_store(&store); - let handle = tokio::spawn(Self::single_store_handler( + join_set.spawn(Self::single_store_handler( clone, self.backoff.clone(), concurrency_permits.clone(), )); - handles.push(handle); } - let results = try_join_all(handles).await?; + + let mut results = Vec::with_capacity(stores_len); + while let Some(join_result) = join_set.join_next().await { + match join_result { + Ok(val) => results.push(val), + Err(e) => { + error!("failed to join task: {}", e); + return Err(Error::JoinError(e)); + } + } + } Ok(results.into_iter().collect::>()) } }