diff --git a/Cargo.lock b/Cargo.lock index b11a3ad9..0115e1fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5468,7 +5468,7 @@ dependencies = [ [[package]] name = "rbuilder-primitives" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=e3b49692d5b4353c62abe828245a44c390f7bec2#e3b49692d5b4353c62abe828245a44c390f7bec2" +source = "git+https://github.com/flashbots/rbuilder?rev=4132e6d7077de51b5f314c687ebaa760f52471c0#4132e6d7077de51b5f314c687ebaa760f52471c0" dependencies = [ "ahash", "alloy-consensus", @@ -5509,7 +5509,7 @@ dependencies = [ [[package]] name = "rbuilder-utils" version = "0.1.0" -source = "git+https://github.com/flashbots/rbuilder?rev=e3b49692d5b4353c62abe828245a44c390f7bec2#e3b49692d5b4353c62abe828245a44c390f7bec2" +source = "git+https://github.com/flashbots/rbuilder?rev=4132e6d7077de51b5f314c687ebaa760f52471c0#4132e6d7077de51b5f314c687ebaa760f52471c0" dependencies = [ "alloy-primitives 1.5.0", "clickhouse", diff --git a/Cargo.toml b/Cargo.toml index 50368032..a558dc8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ revm-primitives = { version = "21.0.2", default-features = false } revm-interpreter = { version = "29.0.1", default-features = false } # rbuilder -rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2" } -rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "e3b49692d5b4353c62abe828245a44c390f7bec2", features = [ +rbuilder-primitives = { git = "https://github.com/flashbots/rbuilder", rev = "4132e6d7077de51b5f314c687ebaa760f52471c0" } +rbuilder-utils = { git = "https://github.com/flashbots/rbuilder", rev = "4132e6d7077de51b5f314c687ebaa760f52471c0", features = [ "test-utils" ] } diff --git a/fixtures/create_bundles_table.sql b/fixtures/create_bundles_table.sql index 49c00941..f76b6b7c 100644 --- a/fixtures/create_bundles_table.sql +++ b/fixtures/create_bundles_table.sql @@ -33,6 +33,7 @@ CREATE TABLE bundles ( `refund_percent` Nullable(UInt8), `refund_recipient` Nullable(FixedString(20)), `delayed_refund` Nullable(Bool), + `disable_cross_region_sharing` Bool, `refund_identity` Nullable(FixedString(20)), `signer_address` Nullable(FixedString(20)), diff --git a/src/builderhub/client.rs b/src/builderhub/client.rs index c124e45d..39e6fa2b 100644 --- a/src/builderhub/client.rs +++ b/src/builderhub/client.rs @@ -29,10 +29,14 @@ impl Client { } /// Register the given signer address with the BuilderHub peer store. - pub async fn register(&self, signer_address: Address) -> Result<(), ClientRegisterError> { + pub async fn register( + &self, + signer_address: Address, + region: String, + ) -> Result<(), ClientRegisterError> { let endpoint = format!("{}/api/l1-builder/v1/register_credentials/orderflow_proxy", self.url); - let body = PeerCredentials { tls_cert: None, ecdsa_pubkey_address: signer_address }; + let body = PeerCredentials { tls_cert: None, ecdsa_pubkey_address: signer_address, region }; let response = self.inner.post(endpoint).json(&body).send().await?; let status = response.status(); if !status.is_success() { diff --git a/src/builderhub/mod.rs b/src/builderhub/mod.rs index cd431c14..5c9abe63 100644 --- a/src/builderhub/mod.rs +++ b/src/builderhub/mod.rs @@ -57,6 +57,9 @@ pub struct PeerCredentials { pub tls_cert: Option, /// Orderflow signer public key. pub ecdsa_pubkey_address: Address, + /// Region of the builder. + #[serde(default, skip_serializing_if = "String::is_empty")] + pub region: String, } /// A [`Peer`] is a builder inside Builderhub. This holds informations about a builder peer, as @@ -135,7 +138,12 @@ impl LocalPeerStore { Self { builders: Arc::new(DashMap::new()) } } - pub fn register(&self, signer_address: Address, port: Option) -> LocalPeerStore { + pub fn register( + &self, + signer_address: Address, + port: Option, + region: String, + ) -> LocalPeerStore { self.builders.insert( signer_address.to_string(), Peer { @@ -147,6 +155,7 @@ impl LocalPeerStore { orderflow_proxy: PeerCredentials { tls_cert: None, ecdsa_pubkey_address: signer_address, + region, }, instance: InstanceData { tls_cert: "".to_string() }, }, diff --git a/src/cli.rs b/src/cli.rs index 2bbc1120..05b5dd00 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,4 +1,6 @@ -use std::{convert::Infallible, net::SocketAddr, num::NonZero, path::PathBuf, str::FromStr}; +use std::{ + convert::Infallible, fmt::Display, net::SocketAddr, num::NonZero, path::PathBuf, str::FromStr, +}; use alloy_primitives::Address; use alloy_signer_local::PrivateKeySigner; @@ -16,6 +18,24 @@ use crate::{ /// The maximum request size in bytes (10 MiB). const MAX_REQUEST_SIZE_BYTES: usize = 10 * 1024 * 1024; +/// Possible config regions +#[derive(Debug, Clone, clap::ValueEnum)] +pub enum Region { + US, + EU, + AP, +} + +impl Display for Region { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::US => write!(f, "us"), + Self::EU => write!(f, "eu"), + Self::AP => write!(f, "ap"), + } + } +} + /// Arguments required to create a clickhouse client. #[derive(PartialEq, Eq, Clone, Debug, Args)] #[group(id = "clickhouse", requires_all = ["CLICKHOUSE_HOST", "CLICKHOUSE_USERNAME", "CLICKHOUSE_PASSWORD", "CLICKHOUSE_DATABASE"])] @@ -210,6 +230,10 @@ pub struct OrderflowIngressArgs { #[clap(long, env = "BUILDERNET_NODE_NAME", id = "BUILDERNET_NODE_NAME", value_parser = replace_dashes_with_underscores)] pub builder_name: String, + /// Region of the builder. + #[clap(long, value_enum, env = "BUILDER_REGION", id = "BUILDER_REGION")] + pub builder_region: Region, + /// The URL of BuilderHub. #[clap(long, value_hint = ValueHint::Url, env = "BUILDERHUB_ENDPOINT", id = "BUILDERHUB_ENDPOINT")] pub builder_hub_url: Option, @@ -350,6 +374,7 @@ impl Default for OrderflowIngressArgs { builder_url: None, builder_ready_endpoint: None, builder_name: String::from("buildernet"), + builder_region: Region::US, builder_hub_url: None, flashbots_signer: None, max_txs_per_bundle: 100, @@ -506,6 +531,8 @@ mod tests { "http://localhost:3000", "--builder-name", "buildernet", + "--builder-region", + "us", ]; let args = OrderflowIngressArgs::try_parse_from(args) @@ -535,6 +562,8 @@ mod tests { "http://localhost:3000", "--builder-name", "buildernet", + "--builder-region", + "us", "--indexer.clickhouse.host", "http://127.0.0.1:12345", ]; @@ -567,6 +596,8 @@ mod tests { "http://localhost:3000", "--builder-name", "buildernet", + "--builder-region", + "us", "--indexer.clickhouse.host", "http://127.0.0.1:12345", "--indexer.clickhouse.database", @@ -613,6 +644,8 @@ mod tests { "http://localhost:3000", "--builder-name", "buildernet", + "--builder-region", + "us", "--indexer.parquet.bundle-receipts-file-path", "pronto.parquet", ]; @@ -647,6 +680,8 @@ mod tests { "http://localhost:3000", "--builder-name", "buildernet", + "--builder-region", + "us", "--indexer.parquet.bundle-receipts-file-path", "pronto.parquet", "--indexer.clickhouse.host", diff --git a/src/forwarder/mod.rs b/src/forwarder/mod.rs index 56edd2e8..fc683063 100644 --- a/src/forwarder/mod.rs +++ b/src/forwarder/mod.rs @@ -49,6 +49,9 @@ pub struct IngressForwarders { peers: Arc>, /// The priority workers for signing requests. workers: PriorityWorkers, + /// The region of the local builder. Used to filter peers when bundles disable + /// cross-region sharing. + region: String, } impl IngressForwarders { @@ -58,8 +61,9 @@ impl IngressForwarders { peers: Arc>, signer: PrivateKeySigner, workers: PriorityWorkers, + region: String, ) -> Self { - Self { local, peers, signer, workers } + Self { local, peers, signer, workers, region } } /// Find peer name by address. @@ -78,6 +82,11 @@ impl IngressForwarders { let priority = order.priority(); let method_name = order.method_name().to_string(); + let restrict_to_local_region = if let SystemOrder::Bundle(ref bundle) = order { + bundle.raw_bundle.metadata.disable_cross_region_sharing + } else { + false + }; // Start with JSON-RPC encoding, that's needed for the local builder anyway. let mut encoded_order = order.clone().encode(); @@ -120,12 +129,16 @@ impl IngressForwarders { let forward = Arc::new(ForwardingRequest::user_to_system(encoded_order.into(), headers)); debug!(peers = %self.peers.len(), "sending order to peers"); - self.broadcast_inner(forward); + self.broadcast_inner(forward, restrict_to_local_region); } - /// Broadcast request to all peers. - fn broadcast_inner(&self, forward: Arc) { + /// Broadcast request to all peers. When `restrict_to_local_region` is true, peers in a + /// different region than the local builder are skipped (but kept in the peer map). + fn broadcast_inner(&self, forward: Arc, restrict_to_local_region: bool) { self.peers.retain(|peer, handle| { + if restrict_to_local_region && handle.info.orderflow_proxy.region != self.region { + return true; + } if let Err(e) = handle.sender.send(forward.priority(), forward.clone()) { error!(?e, %peer, "peer channel closed, removing peer"); @@ -364,3 +377,100 @@ impl Default for LogRateLimiter { Self::new(Duration::from_millis(100)) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + builderhub::{InstanceData, Peer, PeerCredentials}, + primitives::{EncodedOrder, RawOrderMetadata, UtcInstant, WithEncoding}, + priority::Priority, + }; + use alloy_primitives::{Address, B256}; + + fn make_peer_with_region( + region: &str, + ) -> (Peer, PeerHandle, priority::channel::UnboundedReceiver>) { + let info = Peer { + name: region.to_string(), + ip: "127.0.0.1".to_string(), + dns_name: String::new(), + orderflow_proxy: PeerCredentials { + tls_cert: None, + ecdsa_pubkey_address: Address::ZERO, + region: region.to_string(), + }, + instance: InstanceData { tls_cert: String::new() }, + }; + let (tx, rx) = priority::channel::unbounded_channel(); + (info.clone(), PeerHandle { info, sender: tx }, rx) + } + + fn dummy_forward() -> Arc { + let order = EncodedOrder::Raw(WithEncoding { + inner: RawOrderMetadata { + priority: Priority::Medium, + received_at: UtcInstant::now(), + hash: B256::ZERO, + }, + encoding: Arc::new(Vec::new()), + encoding_tcp_forwarder: None, + }); + Arc::new(ForwardingRequest::user_to_local(order)) + } + + fn forwarders_with( + local_region: &str, + peers: Arc>, + ) -> IngressForwarders { + let (local_tx, _) = priority::channel::unbounded_channel(); + IngressForwarders::new( + local_tx, + peers, + alloy_signer_local::PrivateKeySigner::random(), + PriorityWorkers::new_with_threads(1), + local_region.to_string(), + ) + } + + async fn try_recv( + rx: &mut priority::channel::UnboundedReceiver>, + ) -> bool { + tokio::time::timeout(Duration::from_millis(50), rx.recv()).await.is_ok() + } + + #[tokio::test] + async fn broadcast_inner_skips_cross_region_when_restricted() { + let peers: Arc> = Arc::new(DashMap::new()); + let (_, us_handle, mut us_rx) = make_peer_with_region("us"); + let (_, eu_handle, mut eu_rx) = make_peer_with_region("eu"); + peers.insert("us".to_string(), us_handle); + peers.insert("eu".to_string(), eu_handle); + + let forwarders = forwarders_with("us", peers.clone()); + + forwarders.broadcast_inner(dummy_forward(), true); + + assert!(try_recv(&mut us_rx).await, "same-region peer should receive"); + assert!(!try_recv(&mut eu_rx).await, "cross-region peer should be skipped"); + + // Both peers must remain in the map (skip != evict). + assert_eq!(peers.len(), 2); + } + + #[tokio::test] + async fn broadcast_inner_sends_to_all_when_unrestricted() { + let peers: Arc> = Arc::new(DashMap::new()); + let (_, us_handle, mut us_rx) = make_peer_with_region("us"); + let (_, eu_handle, mut eu_rx) = make_peer_with_region("eu"); + peers.insert("us".to_string(), us_handle); + peers.insert("eu".to_string(), eu_handle); + + let forwarders = forwarders_with("us", peers.clone()); + + forwarders.broadcast_inner(dummy_forward(), false); + + assert!(try_recv(&mut us_rx).await); + assert!(try_recv(&mut eu_rx).await); + } +} diff --git a/src/indexer/click/models.rs b/src/indexer/click/models.rs index 8aad8170..6469b40d 100644 --- a/src/indexer/click/models.rs +++ b/src/indexer/click/models.rs @@ -104,6 +104,8 @@ pub struct BundleRow { pub refund_recipient: Option
, /// Whether the bundle has a delayed refund. pub delayed_refund: Option, + /// If bundle disallows sending to other regions + pub disable_cross_region_sharing: bool, /// For 2nd price refunds done by buildernet #[serde(with = "address::option")] pub refund_identity: Option
, @@ -251,6 +253,10 @@ impl From<(SystemBundle, String)> for BundleRow { reverting_tx_hashes: bundle.raw_bundle.metadata.reverting_tx_hashes.clone(), dropping_tx_hashes: bundle.raw_bundle.metadata.dropping_tx_hashes.clone(), delayed_refund: bundle.raw_bundle.metadata.delayed_refund, + disable_cross_region_sharing: bundle + .raw_bundle + .metadata + .disable_cross_region_sharing, refund_tx_hashes: bundle .raw_bundle .metadata @@ -318,6 +324,10 @@ impl From<(SystemBundle, String)> for BundleRow { signer_address: Some(bundle.metadata.signer), builder_name, delayed_refund: bundle.raw_bundle.metadata.delayed_refund, + disable_cross_region_sharing: bundle + .raw_bundle + .metadata + .disable_cross_region_sharing, refund_percent: bundle.raw_bundle.metadata.refund_percent, refund_recipient: bundle.raw_bundle.metadata.refund_recipient, refund_identity: bundle.raw_bundle.metadata.refund_identity, @@ -568,7 +578,9 @@ pub(crate) mod tests { Some("v2".to_string()) }, signing_address: value.signer_address, + // TODO: looks like a bug delayed_refund: None, + disable_cross_region_sharing: value.disable_cross_region_sharing, }, } } diff --git a/src/lib.rs b/src/lib.rs index a4a879a8..d0478470 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -151,10 +151,12 @@ pub async fn run_with_listeners( private_key_pem_file: args.private_key_pem_file, }; + let builder_region = args.builder_region.to_string(); + if let Some(builder_hub_url) = args.builder_hub_url { tracing::debug!(url = builder_hub_url, "Running with BuilderHub"); let builder_hub = builderhub::Client::new(builder_hub_url); - builder_hub.register(local_signer).await?; + builder_hub.register(local_signer, builder_region.clone()).await?; let peer_updater = PeersUpdater::new( peer_update_config, @@ -169,8 +171,11 @@ pub async fn run_with_listeners( tracing::warn!("No BuilderHub URL provided, running with local peer store"); let local_peer_store = LOCAL_PEER_STORE.clone(); - let peer_store = local_peer_store - .register(local_signer, Some(system_listener.local_addr().expect("bound").port())); + let peer_store = local_peer_store.register( + local_signer, + Some(system_listener.local_addr().expect("bound").port()), + builder_region.clone(), + ); let peers = peers.clone(); let peer_updater = @@ -192,11 +197,23 @@ pub async fn run_with_listeners( &task_executor, )?; - IngressForwarders::new(local_sender, peers, orderflow_signer, workers.clone()) + IngressForwarders::new( + local_sender, + peers, + orderflow_signer, + workers.clone(), + builder_region.clone(), + ) } else { // No builder URL provided, so mock local forwarder. let (local_sender, _) = priority::channel::unbounded_channel(); - IngressForwarders::new(local_sender, peers, orderflow_signer, workers.clone()) + IngressForwarders::new( + local_sender, + peers, + orderflow_signer, + workers.clone(), + builder_region.clone(), + ) }; let ingress = Arc::new(OrderflowIngress::new(config, workers, forwarders, indexer_handle)); diff --git a/src/primitives/mod.rs b/src/primitives/mod.rs index d4cc904d..5b6f81c6 100644 --- a/src/primitives/mod.rs +++ b/src/primitives/mod.rs @@ -115,6 +115,7 @@ impl BundleHash for RawBundle { max_timestamp, delayed_refund, block_number, + disable_cross_region_sharing, signing_address: _, // NOTE: If we call `hash`, this should not be set. bundle_hash: _, @@ -184,6 +185,10 @@ impl BundleHash for RawBundle { if let Some(delayed_refund) = delayed_refund { delayed_refund.hash(state); } + + if *disable_cross_region_sharing { + disable_cross_region_sharing.hash(state); + } } let mut hasher = wyhash::WyHash::default(); @@ -686,6 +691,7 @@ pub struct RawBundleMetadataBitcode { pub refund_tx_hashes: Option>, pub delayed_refund: Option, + pub disable_cross_region_sharing: bool, pub bundle_hash: Option<[u8; 32]>, } @@ -718,6 +724,7 @@ impl From<&RawBundleMetadata> for RawBundleMetadataBitcode { .map(|hashes| hashes.into_iter().map(|h| *h).collect()), delayed_refund: r.delayed_refund, + disable_cross_region_sharing: r.disable_cross_region_sharing, bundle_hash: r.bundle_hash.map(|b| b.0), } } @@ -750,6 +757,7 @@ impl From for RawBundleMetadata { .map(|hashes| hashes.into_iter().map(B256::from).collect()), delayed_refund: r.delayed_refund, + disable_cross_region_sharing: r.disable_cross_region_sharing, bundle_hash: r.bundle_hash.map(B256::from), } } @@ -897,6 +905,7 @@ mod tests { refund_recipient: None, refund_tx_hashes: None, delayed_refund: None, + disable_cross_region_sharing: false, bundle_hash: None, }, }; diff --git a/src/utils.rs b/src/utils.rs index 99a4c368..afaf9451 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -293,6 +293,7 @@ pub mod testutils { refund_percent: Some(rng.random_range(0..100)), refund_recipient: Some(Address::random_with(rng)), delayed_refund: None, + disable_cross_region_sharing: false, bundle_hash: None, }, }