diff --git a/Cargo.lock b/Cargo.lock index 6fd2d3d..4e92b90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -723,6 +723,19 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "core_affinity2" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a6116213b44f1aef4d562e4587bfbd3b43b8c5ba29993f6c83839daac00b52b" +dependencies = [ + "libafl_core", + "libc", + "rustversion", + "serde", + "windows", +] + [[package]] name = "cpufeatures" version = "0.2.17" @@ -1072,6 +1085,7 @@ dependencies = [ "chrono", "clap", "convert_case 0.6.0", + "core_affinity2", "dashmap", "deadpool-postgres", "eyre", @@ -1913,6 +1927,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" +[[package]] +name = "libafl_core" +version = "0.15.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b4823e79015f1fab46a90fe2577864867ffed2bc54994945a72c5f92a0ac910" +dependencies = [ + "rustversion", + "windows-result", +] + [[package]] name = "libc" version = "0.2.184" @@ -4209,6 +4233,27 @@ dependencies = [ "web-sys", ] +[[package]] +name = "windows" +version = "0.62.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "527fadee13e0c05939a6a05d5bd6eec6cd2e3dbd648b9f8e447c6518133d8580" +dependencies = [ + "windows-collections", + "windows-core", + "windows-future", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23b2d95af1a8a14a3c7367e1ed4fc9c20e0a26e79551b1454d72583c97cc6610" +dependencies = [ + "windows-core", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -4222,6 +4267,17 @@ dependencies = [ "windows-strings", ] +[[package]] +name = "windows-future" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1d6f90251fe18a279739e78025bd6ddc52a7e22f921070ccdc67dde84c605cb" +dependencies = [ + "windows-core", + "windows-link", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -4250,6 +4306,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e2e40844ac143cdb44aead537bbf727de9b044e107a0f1220392177d15b0f26" +dependencies = [ + "windows-core", + "windows-link", +] + [[package]] name = "windows-registry" version = "0.6.1" @@ -4339,6 +4405,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3949bd5b99cafdf1c7ca86b43ca564028dfe27d66958f2470940f73d86d75b37" +dependencies = [ + "windows-link", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 2cb1d62..66c2fa1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ full = [ "log_throttling", ] types = [] +core_affinity = ["dep:core_affinity2"] ws = [ # Features ws depends on "types", @@ -39,7 +40,6 @@ ws = [ "dep:http-body-util", "dep:httpdate", "dep:webpki-roots", - # "dep:", ] database = [ # Feature dependencies @@ -135,6 +135,7 @@ http-body-util = { version = "0.1", optional = true } httpdate = { version = "1.0", optional = true } tracing-throttle = { version = "0.4", features = ["async"], optional = true } webpki-roots = { version = "0.26", optional = true } +core_affinity2 = { version = "0.15", optional = true } # OpenTelemetry dependencies (default feature) opentelemetry = { version = "0.31", features = ["logs"] } diff --git a/src/libs/ws/server.rs b/src/libs/ws/server.rs index 5cabd2e..9c0a7e5 100644 --- a/src/libs/ws/server.rs +++ b/src/libs/ws/server.rs @@ -343,13 +343,35 @@ impl WebsocketServer { let num_shards = shard_count(); info!("Starting {} WebSocket shards", num_shards); + let mut cores = vec![]; + match get_available_cores() { + Ok(c) if !c.is_empty() => { + info!( + core_count = c.len(), + "Core affinity enabled, {} cores available", + c.len() + ); + cores.extend(c); + } + Ok(_) => { + warn!("Core affinity enabled but no cores reported; affinity disabled"); + } + Err(e) => { + warn!("Core affinity enabled but get_core_ids() returned {}; affinity disabled", e); + } + } + let mut shard_senders = Vec::with_capacity(num_shards); - for _ in 0..num_shards { + for shard_index in 0..num_shards { let (tx, rx) = mpsc::channel::<(T::Channel1, SocketAddr)>(256); let this = Arc::clone(&this); let states = Arc::clone(&states); let listener = Arc::clone(&listener); + let cores_for_thread = cores.clone(); std::thread::spawn(move || { + if !set_shard_affinity(shard_index, &cores_for_thread) { + debug!(shard_index, "Shard running without CPU affinity"); + } WebsocketServer::run_shard(this, states, listener, rx); }); shard_senders.push(tx); @@ -506,6 +528,54 @@ fn read_cgroup_v1_quota() -> Option { Some(((quota + period - 1) / period) as usize) } +/// Get the list of available CPU cores for affinity pinning. +#[cfg(feature = "core_affinity")] +fn get_available_cores() -> Result> { + Ok(core_affinity2::get_core_ids()?) +} + +/// Stub for when core_affinity2 feature is disabled. +#[cfg(not(feature = "core_affinity"))] +fn get_available_cores() -> Result> { + Ok(vec![]) +} + +/// Attempt to pin the current thread to a specific CPU core. +/// Returns true if affinity was successfully set, false otherwise. +#[cfg(feature = "core_affinity")] +fn set_shard_affinity(shard_index: usize, available_cores: &[core_affinity2::CoreId]) -> bool { + if available_cores.is_empty() { + warn!("Core affinity requested but no cores available"); + return false; + } + + let core_idx = shard_index % available_cores.len(); + let core_id = &available_cores[core_idx]; + + if core_id.set_affinity().is_ok() { + debug!( + shard_index, + core_id = ?core_id, + total_cores = available_cores.len(), + "Shard pinned to CPU core" + ); + true + } else { + warn!( + shard_index, + core_id = ?core_id, + "Failed to set CPU affinity (platform may not support)" + ); + false + } +} + +/// Stub for when core_affinity feature is disabled. +#[cfg(not(feature = "core_affinity"))] +fn set_shard_affinity(_: usize, _: &[()]) -> bool { + false +} + pub fn wrap_ws_error(err: Result) -> Result { err.map_err(|x| eyre!(x)) }