Skip to content

driver: parallelize unsupported order detection#4347

Open
metalurgical wants to merge 21 commits intocowprotocol:mainfrom
metalurgical:fix_chore_3516
Open

driver: parallelize unsupported order detection#4347
metalurgical wants to merge 21 commits intocowprotocol:mainfrom
metalurgical:fix_chore_3516

Conversation

@metalurgical
Copy link
Copy Markdown
Contributor

@metalurgical metalurgical commented Apr 20, 2026

Description

Allow unsupported order detection be executed in parallel and optimize unsupported order detection.

Changes

  • Move without_unsupported_orders to risk detector.
  • Refactor without_unsupported_orders to do all unsupported order filtering (including flashloans) in one pass.
  • IntroduceDetectorApi trait to decouple the simulation detector for tests.
  • Schedule filtering and and liquidity fetching as separate tasks with tokio::spawn in solve(), allowing parallel execution.
  • Update quote.rs to use without_unsupported_orders.
  • Add test coverage.

How to test

cargo test -p driver

Related Issues

Fixes #3516, follow up PR to #4309 and taking into account #4329

@metalurgical metalurgical requested a review from a team as a code owner April 20, 2026 21:24
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the competition logic to parallelize unsupported order detection by updating the risk detector API to return a set of UIDs instead of modifying the auction in place. A critical logic error was identified in the new unsupported_order_uids method where orders flagged as unsupported by metrics are filtered out of the processing loop but never added to the removal set, effectively bypassing the intended filtering logic.

Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
@metalurgical
Copy link
Copy Markdown
Contributor Author

@jmg-duarte This is minimal, easily reviewable and I think in line with what is expected for resolving the linked issue. Just finishing off what I started.

Copy link
Copy Markdown
Member

@AryanGodara AryanGodara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some minor comments

Comment thread crates/driver/src/domain/competition/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs
@metalurgical metalurgical marked this pull request as draft April 29, 2026 14:55
Make unsupported order detection read-only and execute it in parallel with sorting and data fetching. Apply filtering after update_orders to preserve existing ordering.
@metalurgical metalurgical marked this pull request as ready for review April 29, 2026 22:58
@metalurgical
Copy link
Copy Markdown
Contributor Author

@AryanGodara Thank you for the comments. It should be ready to be reviewed again. Excuse the force push, had to rebase a branch in to ensure it was working and then rebase it out again.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the risk detection logic by introducing a SellQualityDetector trait to decouple the simulation detector and improve testability. It updates the Competition and Quote domains to use the refactored filter_unsupported_orders_in_auction and unsupported_order_uids methods, enabling in-place filtering of unsupported orders. Additionally, comprehensive unit tests were added for the Detector logic. I have no feedback to provide as the review comments provided did not meet the high severity threshold required by the style guide.

Copy link
Copy Markdown
Member

@AryanGodara AryanGodara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Description nit: "Run unsupported detection in parallel in solve()" fit the original version, but not entirely accurate imo after the prev commits

The real wins here are the read-only unsupported_order_uids API, so quote.rs can reuse detection without faking an Auction, and the SellQualityDetector trait that enables the new tests. Worth surfacing those in the description instead.

otherwise lgtm

@metalurgical
Copy link
Copy Markdown
Contributor Author

metalurgical commented May 1, 2026

Description nit: "Run unsupported detection in parallel in solve()" fit the original version, but not entirely accurate imo after the prev commits

The real wins here are the read-only unsupported_order_uids API, so quote.rs can reuse detection without faking an Auction, and the SellQualityDetector trait that enables the new tests. Worth surfacing those in the description instead.

otherwise lgtm

Yes, this is true, with the latest commits it is effectively concurrent with liquidity fetching.

Edit: I have updated this to use tokio::spawn to schedule them as separate tasks and checked the PR description as well.

@metalurgical metalurgical changed the title driver: parallel unsupported order detection driver: parallelize unsupported order detection May 1, 2026
@metalurgical metalurgical requested a review from AryanGodara May 4, 2026 06:39
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/mod.rs Outdated
Comment thread crates/driver/src/infra/api/error.rs Outdated
Comment thread crates/driver/src/domain/quote.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
@metalurgical metalurgical requested a review from jmg-duarte May 4, 2026 21:34
Copy link
Copy Markdown
Contributor

@jmg-duarte jmg-duarte left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the diff is going in a much better direction now, the core logic changes are much simpler and the separation you did made me think; good job

Please review all the comments before starting coding, there's one in specific I'd appreciate we discuss as it might be possible to separate out of the PR; and make this an incremental change 🚀

Comment thread crates/driver/src/domain/competition/risk_detector/bad_tokens/simulation.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/mod.rs Outdated
}

#[tokio::test]
async fn without_unsupported_orders_filters_unsupported_orders_and_flashloans() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we just break this big test into multiple separate ones? im ok with repeated setup, the combinations here get a bit messy

you could do each case separate like "metrics bad" "token bad", etc

also the flashloan test is only useful for orders that would actually work if the flashloan was supported, otherwise you cant verify the order wasnt separated for other reason

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that can be done.

Though I'd then wrap the setup code into a helper so it isn't duplicated everywhere.

The flashloan is still useful here to show it gets filtered out, but yes I do agree it needs additional comments to this effect.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setup code has been wrapped into helpers.

Have implemented singular tests in addition to the mixed case test.

A mixed case test is still useful to have here, have added comments to aid readability.

Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment on lines 126 to 159
// Check if uid is unsupported in metrics
if matches!(
self.metrics
.as_ref()
.map(|m| m.get_quality(&order.uid, now)),
Some(Quality::Unsupported)
) {
return Some(order.uid);
}
let sell = self.get_token_quality(order.sell.token, now);
let buy = self.get_token_quality(order.buy.token, now);
match (sell, buy) {
// both tokens supported => keep order
(Quality::Supported, Quality::Supported) => Some(order),
(Quality::Supported, Quality::Supported) => None,
// at least 1 token unsupported => drop order
(Quality::Unsupported, _) | (_, Quality::Unsupported) => {
removed_uids.push(order.uid);
None
}
(Quality::Unsupported, _) | (_, Quality::Unsupported) => Some(order.uid),
// sell token quality is unknown => keep order if token is supported
(Quality::Unknown, _) => {
let Some(detector) = &self.simulation_detector else {
// we can't determine quality => assume order is good
return Some(order);
return None;
};
let check_tokens_fut = async move {
let quality = detector.determine_sell_token_quality(&order, now).await;
(order, quality)
let quality = detector.determine_sell_token_quality(order, now).await;
(order.uid, quality)
};
token_quality_checks.push(check_tokens_fut);
None
}
// buy token quality is unknown => keep order (because we can't
// determine quality and assume it's good)
(_, Quality::Unknown) => Some(order),
(_, Quality::Unknown) => None,
}
})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the way you separated the unsupported_order_uids from the retain made the diff much simpler, but now we iterate twice over the orders (one to collect the orders to be removed, another to remove them), if we extract this logic to analyze a single order, won't we be able to perform just the retain while analyzing each order?

comparing to the previous code, that approach change alone (of taking auction orders instead of the full auction to filter just the orders) seems to make it easier to

  1. follow the code
  2. make filtering clearer
  3. unlock some optimizations like avoiding multiple passes over the array
  4. make testing easier too since you can check single orders

let me know your thoughts before handling this comment

Copy link
Copy Markdown
Contributor Author

@metalurgical metalurgical May 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterate twice over the orders (one to collect the orders to be removed, another to remove them),
if we extract this logic to analyze a single order, won't we be able to perform just the retain while analyzing each order?

Yes, could just to do it in filter_unsupported_orders_in_auction_orders(&self, orders: &mut Vec<Order>) without having to go through unsupported_order_uids(&self, orders: &[Order]).

I don't think this will reduce clarity by doing so?

The added tests would be unaffected as well.

Edit: It will very much look like the original code then though. Loop over the orders once, building a replacement set and then doing *orders = replacement. The filter().filter_map() can then also just be a normal for loop which may be clearer

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have used a for loop here, it is slightly less idiomatic but of the same complexity/performance and easier to read.

It also now only does one pass, flashloans inclusive.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is slightly less idiomatic

Readability > idiomatic

Copy link
Copy Markdown
Contributor

@jmg-duarte jmg-duarte May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd appreciate you stopped closing these out when its supposed to be open for discussion.

There was a reason I stated:

Please review all the comments before starting coding, there's one in specific I'd appreciate we discuss as it might be possible to separate out of the PR; and make this an incremental change 🚀

Otherwise, we keep chasing a moving goal. I'm trying to discuss an idea to ease the coding burden and you keep writing code that I need to keep reviewing.

And let me remember you that all this still needs to be measured.

Copy link
Copy Markdown
Contributor Author

@metalurgical metalurgical May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood, please continue.

Copy link
Copy Markdown
Contributor

@jmg-duarte jmg-duarte May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewing your code before the for loop with a std::mem::take (which is great footgun as it's a borrowed vector; before it took ownership over the auction) made me notice that we can probably split this PR further to make everything easier for everyone

  1. Make filter_unsupported_orders_in_auction take &mut Vec<Order> instead of the whole auction + add the tests — no fancy async, just simple changes to create assurance that what comes next will work
  2. Then we could make the filter_map inside filter_unsupported_orders_in_auction a standalone function that returns an OrderQuality enum; using that + whether or not there's a detector we can split the collection of futures into one that doesn't even create a futures unordered and another that does, making the code cleaner

I drafted this

Apply on top of 6045c8b

diff --git a/crates/driver/src/domain/competition/risk_detector/mod.rs b/crates/driver/src/domain/competition/risk_detector/mod.rs
index ca33e0d28..5bf54f709 100644
--- a/crates/driver/src/domain/competition/risk_detector/mod.rs
+++ b/crates/driver/src/domain/competition/risk_detector/mod.rs
@@ -18,12 +18,11 @@
 
 use {
     crate::domain::competition::{
-        Order,
-        order::Uid,
-        risk_detector::bad_tokens::simulation::SellQualityDetector,
+        Order, order::Uid, risk_detector::bad_tokens::simulation::SellQualityDetector,
     },
     eth_domain_types as eth,
-    futures::{StreamExt, stream::FuturesUnordered},
+    futures::{FutureExt, StreamExt, stream::FuturesUnordered},
+    itertools::{Either, Itertools},
     std::{
         collections::{HashMap, HashSet},
         fmt,
@@ -64,6 +63,12 @@ pub struct Detector {
     metrics: Option<bad_orders::metrics::Detector>,
 }
 
+enum OrderQuality {
+    Bad,
+    Good,
+    Unknown,
+}
+
 impl Detector {
     /// Hardcodes tokens as (un)supported based on the provided config. This has
     /// the highest priority when looking up a token's quality.
@@ -114,57 +119,103 @@ impl Detector {
         }
     }
 
-    /// Returns a set of orders uids for orders that are found to be
-    /// unsupported.
-    pub async fn unsupported_order_uids(&self, orders: &[Order]) -> HashSet<Uid> {
-        let now = Instant::now();
-        let mut token_quality_checks = FuturesUnordered::new();
+    fn qualify_order(&self, order: &Order, now: Instant) -> OrderQuality {
+        // Check if uid is unsupported in metrics
+        if matches!(
+            self.metrics
+                .as_ref()
+                .map(|m| m.get_quality(&order.uid, now)),
+            Some(Quality::Unsupported)
+        ) {
+            return OrderQuality::Bad;
+        }
+
+        let sell = self.get_token_quality(order.sell.token, now);
+        let buy = self.get_token_quality(order.buy.token, now);
+        match (sell, buy) {
+            // both tokens supported => keep order
+            (Quality::Supported, Quality::Supported) => OrderQuality::Good,
+            // at least 1 token unsupported => drop order
+            (Quality::Unsupported, _) | (_, Quality::Unsupported) => OrderQuality::Bad,
+            // sell token quality is unknown => keep order if token is supported
+            (Quality::Unknown, _) if self.simulation_detector.is_some() => {
+                // we can't determine quality => assume order is good
+                OrderQuality::Good
+            }
+            (Quality::Unknown, _) => OrderQuality::Unknown,
+            // buy token quality is unknown => keep order (because we can't
+            // determine quality and assume it's good)
+            (_, Quality::Unknown) => OrderQuality::Good,
+        }
+    }
 
-        let mut removed_uids: HashSet<Uid> = orders
+    fn without_sim_detector(&self, orders: &[Order]) -> HashSet<Uid> {
+        let now = Instant::now();
+        orders
             .iter()
-            .filter_map(|order| {
-                // Check if uid is unsupported in metrics
-                if matches!(
-                    self.metrics
-                        .as_ref()
-                        .map(|m| m.get_quality(&order.uid, now)),
-                    Some(Quality::Unsupported)
-                ) {
-                    return Some(order.uid);
-                }
-                let sell = self.get_token_quality(order.sell.token, now);
-                let buy = self.get_token_quality(order.buy.token, now);
-                match (sell, buy) {
-                    // both tokens supported => keep order
-                    (Quality::Supported, Quality::Supported) => None,
-                    // at least 1 token unsupported => drop order
-                    (Quality::Unsupported, _) | (_, Quality::Unsupported) => Some(order.uid),
-                    // sell token quality is unknown => keep order if token is supported
-                    (Quality::Unknown, _) => {
-                        let Some(detector) = &self.simulation_detector else {
-                            // we can't determine quality => assume order is good
-                            return None;
-                        };
-                        let check_tokens_fut = async move {
-                            let quality = detector.determine_sell_token_quality(order, now).await;
-                            (order.uid, quality)
-                        };
-                        token_quality_checks.push(check_tokens_fut);
-                        None
-                    }
-                    // buy token quality is unknown => keep order (because we can't
-                    // determine quality and assume it's good)
-                    (_, Quality::Unknown) => None,
-                }
+            .filter(|order| {
+                let order_quality = self.qualify_order(order, now);
+                // Without a detector we assume the unknown quality orders are good
+                matches!(order_quality, OrderQuality::Good | OrderQuality::Unknown)
             })
-            .collect();
+            .map(|order| order.uid)
+            .collect()
+    }
+
+    async fn using_simulation_detector(
+        &self,
+        orders: &[Order],
+        detector: &Box<dyn SellQualityDetector>,
+    ) -> HashSet<Uid> {
+        let now = Instant::now();
+
+        let (mut removed_uids, mut token_quality_checks): (HashSet<Uid>, FuturesUnordered<_>) =
+            orders
+                .iter()
+                .filter_map(|order| {
+                    let order_quality = self.qualify_order(order, now);
+                    match order_quality {
+                        OrderQuality::Good => None,
+                        OrderQuality::Bad => Some((order, true)),
+                        OrderQuality::Unknown => Some((order, false)),
+                    }
+                })
+                .partition_map(|(order, quality_known)| {
+                    if quality_known {
+                        Either::Left(order.uid)
+                    } else {
+                        Either::Right(
+                            async move {
+                                let quality =
+                                    detector.determine_sell_token_quality(order, now).await;
+                                (order.uid, quality)
+                            }
+                            .boxed(),
+                        )
+                    }
+                });
 
         while let Some((uid, quality)) = token_quality_checks.next().await {
-            if quality != Quality::Supported {
+            if !matches!(quality, Quality::Supported) {
                 removed_uids.insert(uid);
             }
         }
 
+        removed_uids
+    }
+
+    /// Returns a set of orders uids for orders that are found to be
+    /// unsupported.
+    pub async fn unsupported_order_uids(&self, orders: &[Order]) -> HashSet<Uid> {
+        let removed_uids = match &self.simulation_detector {
+            Some(detector) => {
+                let removed_uids = self.using_simulation_detector(orders, detector).await;
+                detector.evict_outdated_entries();
+                removed_uids
+            }
+            None => self.without_sim_detector(orders),
+        };
+
         if !removed_uids.is_empty() {
             tracing::debug!(
                 orders = ?removed_uids,
@@ -172,10 +223,6 @@ impl Detector {
             );
         }
 
-        if let Some(detector) = &self.simulation_detector {
-            detector.evict_outdated_entries();
-        }
-
         removed_uids
     }
 
@@ -222,15 +269,8 @@ mod tests {
             domain::competition::{
                 Order,
                 order::{
-                    BuyTokenBalance,
-                    Kind,
-                    Partial,
-                    SellTokenBalance,
-                    Side,
-                    Signature,
-                    Uid,
-                    app_data::AppData,
-                    signature,
+                    BuyTokenBalance, Kind, Partial, SellTokenBalance, Side, Signature, Uid,
+                    app_data::AppData, signature,
                 },
                 risk_detector::bad_tokens::simulation::MockSellQualityDetector,
             },
@@ -238,12 +278,7 @@ mod tests {
             util,
         },
         app_data::{
-            AppDataHash,
-            Flashloan,
-            ProtocolAppData,
-            Root,
-            ValidatedAppData,
-            hash_full_app_data,
+            AppDataHash, Flashloan, ProtocolAppData, Root, ValidatedAppData, hash_full_app_data,
         },
         eth_domain_types::{Asset, TokenAmount, U256},
         std::{sync::Arc, time::Duration},

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will take a look

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I think I understand the way you reasoned this and have refactored the code to match it.

I also replaced the std::mem::take with .drain(..).

Let me know what you think of it now.

@metalurgical
Copy link
Copy Markdown
Contributor Author

I think the diff is going in a much better direction now, the core logic changes are much simpler and the separation you did made me think; good job

Please review all the comments before starting coding, there's one in specific I'd appreciate we discuss as it might be possible to separate out of the PR; and make this an incremental change 🚀

@jmg-duarte Appreciate the comments. Happy to discuss it..

@metalurgical metalurgical requested a review from jmg-duarte May 5, 2026 21:50
tokio::spawn(
async move {
risk_detector
.without_unsupported_orders(&mut auction.orders, flashloans_enabled)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before this PR, the quote path only ran token quality filtering. Flashloan filtering only happened in solve(). Now, without_unsupported_orders does both, so a solver with flashloans_enabled = false will start returning UnsupportedToken at quote time for flashloan orders. That's probably the right behavior (fail fast instead of silently quoting something the solver can't settle), but it's a semantic change that isn't called out in the PR description and isn't covered by an explicit test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to quote.rs is mentioned in the description, perhaps it was not explicit enough?

Logically the solver config should be used here, unless there is a specific reason not to? Could you confirm the exact behavior that is expected here?

It does not appear that a test can be written for this currently.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some digging, have added an explicit test. Changes should be non-breaking.

solver::Liquidity::Fetch => tasks.liquidity.await,
solver::Liquidity::Skip => Arc::new(Vec::new()),
let (auction, liquidity) = tokio::join!(
tokio::spawn(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokio::spawn returns detached JoinHandles. tokio::join! awaits them, but if the outer solve() future is dropped (e.g. client disconnect, overload shed), the spawned tasks keep running to completion. The previous inline tokio::join!(async { … }, future) would have been cancelled with the parent.

Copy link
Copy Markdown
Contributor Author

@metalurgical metalurgical May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Though this behavior is not new here, it already exists with run_blocking_with_timer.

I would like to suggest that you have a look at #4379. The same concept there can be used for this as well, just couple it with a scopeguard. This is also probably better implemented there as well, else it is duplicating efforts.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be solveable by simply using a cancellationtoken + dropguard?
The PR you're pointing to is huge and the scope/steps need to be discussed

Copy link
Copy Markdown
Contributor Author

@metalurgical metalurgical May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is it.

The PR I am pointing to already has the cancellation token threaded through. It appears large but much of it is tests and plumbing. Did subsequently open an issue for it, #4380. So it just needs the drop guard added to it (and an patched in here) then the above behavior mentioned here will be handled as well.

Happy to discuss scope and steps further.

@metalurgical metalurgical requested a review from squadgazzz May 6, 2026 21:21
Copy link
Copy Markdown
Contributor

@squadgazzz squadgazzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two notes on the c06963a refactor.

Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Comment thread crates/driver/src/domain/competition/risk_detector/mod.rs Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

chore(driver): parallelize without_unsupported_orders execution

4 participants