Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ full = [
"log_throttling",
]
types = []
core_affinity = ["dep:core_affinity2"]
ws = [
# Features ws depends on
"types",
Expand All @@ -39,7 +40,6 @@ ws = [
"dep:http-body-util",
"dep:httpdate",
"dep:webpki-roots",
# "dep:",
]
database = [
# Feature dependencies
Expand Down Expand Up @@ -135,6 +135,7 @@ http-body-util = { version = "0.1", optional = true }
httpdate = { version = "1.0", optional = true }
tracing-throttle = { version = "0.4", features = ["async"], optional = true }
webpki-roots = { version = "0.26", optional = true }
core_affinity2 = { version = "0.15", optional = true }

# OpenTelemetry dependencies (default feature)
opentelemetry = { version = "0.31", features = ["logs"] }
Expand Down
72 changes: 71 additions & 1 deletion src/libs/ws/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,35 @@ impl WebsocketServer {
let num_shards = shard_count();
info!("Starting {} WebSocket shards", num_shards);

let mut cores = vec![];
match get_available_cores() {
Ok(c) if !c.is_empty() => {
info!(
core_count = c.len(),
"Core affinity enabled, {} cores available",
c.len()
);
cores.extend(c);
}
Ok(_) => {
warn!("Core affinity enabled but no cores reported; affinity disabled");
}
Err(e) => {
warn!("Core affinity enabled but get_core_ids() returned {}; affinity disabled", e);
}
}

let mut shard_senders = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
for shard_index in 0..num_shards {
let (tx, rx) = mpsc::channel::<(T::Channel1, SocketAddr)>(256);
let this = Arc::clone(&this);
let states = Arc::clone(&states);
let listener = Arc::clone(&listener);
let cores_for_thread = cores.clone();
std::thread::spawn(move || {
if !set_shard_affinity(shard_index, &cores_for_thread) {
debug!(shard_index, "Shard running without CPU affinity");
}
WebsocketServer::run_shard(this, states, listener, rx);
});
shard_senders.push(tx);
Expand Down Expand Up @@ -506,6 +528,54 @@ fn read_cgroup_v1_quota() -> Option<usize> {
Some(((quota + period - 1) / period) as usize)
}

/// Get the list of available CPU cores for affinity pinning.
#[cfg(feature = "core_affinity")]
fn get_available_cores() -> Result<Vec<core_affinity2::CoreId>> {
Ok(core_affinity2::get_core_ids()?)
}

/// Stub for when core_affinity2 feature is disabled.
#[cfg(not(feature = "core_affinity"))]
fn get_available_cores() -> Result<Vec<()>> {
Ok(vec![])
}

/// Attempt to pin the current thread to a specific CPU core.
/// Returns true if affinity was successfully set, false otherwise.
#[cfg(feature = "core_affinity")]
fn set_shard_affinity(shard_index: usize, available_cores: &[core_affinity2::CoreId]) -> bool {
if available_cores.is_empty() {
warn!("Core affinity requested but no cores available");
return false;
}

let core_idx = shard_index % available_cores.len();
let core_id = &available_cores[core_idx];

if core_id.set_affinity().is_ok() {
debug!(
shard_index,
core_id = ?core_id,
total_cores = available_cores.len(),
"Shard pinned to CPU core"
);
true
} else {
warn!(
shard_index,
core_id = ?core_id,
"Failed to set CPU affinity (platform may not support)"
);
false
}
}

/// Stub for when core_affinity feature is disabled.
#[cfg(not(feature = "core_affinity"))]
fn set_shard_affinity(_: usize, _: &[()]) -> bool {
false
}

pub fn wrap_ws_error<T>(err: Result<T, WsError>) -> Result<T> {
err.map_err(|x| eyre!(x))
}
Expand Down