From ade7bd9729e5f52873019a905da05fed25baf14d Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 09:38:03 -0700 Subject: [PATCH 1/4] fix(rust-lcm): LRU eviction on fragment reassembly map MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirrors the C library's `lcm_frag_buf_store` semantics from udpm_util.{c,h}: cap the in-flight reassembly buffer at 16 MiB total bytes and 1000 entries, evicting the least-recently-updated entry when either limit is exceeded. Before this change the Rust LCM port's reassembly map was an unbounded HashMap with no eviction; one dropped UDP datagram in a fragmented message left the other N-1 fragments stuck in the map forever. Under sustained large-message (>65 KB) load with realistic multicast loopback drop rates, incomplete entries accumulated, the locked HashMap grew without bound, lock contention starved the receive thread, and subsequent message drop rates compounded. Discovered during DimOS PGO Rust port KITTI-360 benchmarking (PassivePlatypus task): ~50% drop rate on 10 Hz × 500 KB PointCloud2 publishes, with verified 128 MB SO_RCVBUF on both sides of the channel and `set_queue_capacity(10000)` on every subscription. Buffer-layer mitigations didn't move the drop rate because the drops were upstream of the socket — in this map. Implementation: - Added `last_update: Instant` to `FragmentBuffer`. Updated on every fragment arrival on that entry. - New `FragStore` wrapper around the HashMap tracks `total_bytes` alongside the entries. - `FragStore::evict_lru()` picks the entry with the oldest `last_update` (linear scan — matches C library's `_find_lru_frag_buf` g_hash_table_foreach pass). - `FragStore::enforce_caps()` evicts in a loop until both caps fit. Called from `process_fragment` only when a new entry was just inserted; completed-message removals already bring totals down so no enforce is needed there. Caps: - `MAX_FRAG_BUF_TOTAL_BYTES = 16 MiB` (matches C `MAX_FRAG_BUF_TOTAL_SIZE = 1 << 24`). - `MAX_NUM_FRAG_BUFS = 1000` (matches C `MAX_NUM_FRAG_BUFS`). Tests added (all passing): - frag_store_evicts_when_over_byte_cap: insert >16 MiB across entries, assert total_bytes drops to <= cap and at least one entry evicted. - frag_store_evicts_when_over_entry_cap: insert MAX_NUM_FRAG_BUFS+5 small entries, assert count is exactly MAX_NUM_FRAG_BUFS after. - frag_store_evict_lru_picks_oldest: two entries with stagged last_update, verify evict_lru removes the older one. Existing loopback round-trip tests (6 tests) still pass — eviction doesn't kick in for happy-path traffic since reassembly entries are typically removed on completion within microseconds. See ~/repos/jhist/wiki/rust-lcm-fragmentation-bug.md for the full discovery narrative + C-vs-Rust diff. Branch: jeff/fix/rust-frag-reassembly-leak --- tools/rust/lcm/src/transport.rs | 166 +++++++++++++++++++++++++++++++- 1 file changed, 162 insertions(+), 4 deletions(-) diff --git a/tools/rust/lcm/src/transport.rs b/tools/rust/lcm/src/transport.rs index ef74aa9..5766ade 100644 --- a/tools/rust/lcm/src/transport.rs +++ b/tools/rust/lcm/src/transport.rs @@ -6,6 +6,7 @@ use std::io; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Mutex; use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Instant; const MAGIC_SHORT: u32 = 0x4c433032; // "LC02" const MAGIC_LONG: u32 = 0x4c433033; // "LC03" @@ -13,6 +14,17 @@ const SHORT_HEADER_SIZE: usize = 8; const FRAGMENT_HEADER_SIZE: usize = 20; const MAX_DATAGRAM_SIZE: usize = 65507; +// Reassembly buffer caps. Mirrors the C LCM library's +// MAX_FRAG_BUF_TOTAL_SIZE / MAX_NUM_FRAG_BUFS in `udpm_util.h`. When either +// limit is exceeded, the least-recently-updated entry is evicted to make +// room — preventing the reassembly map from growing unbounded when fragments +// are dropped at the UDP layer (any of N datagrams lost = the other N-1 +// stuck in the map). Without eviction, a single dropped packet leaks ~500 KB +// forever, and over a sustained large-message stream the map fills, lock +// contention degrades the receive thread, and drop rates cascade. +const MAX_FRAG_BUF_TOTAL_BYTES: usize = 16 * 1024 * 1024; // 16 MiB +const MAX_NUM_FRAG_BUFS: usize = 1000; + /// Default LCM multicast group address. pub const DEFAULT_MULTICAST_GROUP: Ipv4Addr = Ipv4Addr::new(239, 255, 76, 67); /// Default LCM multicast port. @@ -26,6 +38,49 @@ struct FragmentBuffer { num_fragments: u16, received: u16, data: Vec, + /// Monotonic time of the last fragment arrival on this entry. Used to + /// pick the LRU entry for eviction when the reassembly map fills. + last_update: Instant, +} + +/// Container for in-flight fragment buffers, with LRU eviction. Wraps a +/// `HashMap` keyed by `(sender, seqno)` and tracks total buffered bytes so +/// we can enforce both an entry-count cap and a memory cap. Mirrors the +/// behavior of `lcm_frag_buf_store` in upstream LCM's `udpm_util.{c,h}`. +struct FragStore { + map: HashMap<(SocketAddr, u32), FragmentBuffer>, + total_bytes: usize, +} + +impl FragStore { + fn new() -> Self { + Self { map: HashMap::new(), total_bytes: 0 } + } + + /// Evict the single least-recently-updated entry, returning true if one + /// was found. Caller loops until both caps are satisfied. + fn evict_lru(&mut self) -> bool { + let lru_key = self.map.iter() + .min_by_key(|(_, fb)| fb.last_update) + .map(|(key, _)| *key); + if let Some(key) = lru_key { + if let Some(fb) = self.map.remove(&key) { + self.total_bytes = self.total_bytes.saturating_sub(fb.data.len()); + return true; + } + } + false + } + + /// Ensure both caps are honored. Eviction continues until the store + /// fits within `max_total_bytes` AND `max_entries`. Called after each + /// new entry insert. + fn enforce_caps(&mut self) { + while (self.total_bytes > MAX_FRAG_BUF_TOTAL_BYTES + || self.map.len() > MAX_NUM_FRAG_BUFS) + && self.evict_lru() + {} + } } /// Configuration for an LCM transport instance. @@ -77,7 +132,7 @@ fn fragment_params(msg_size: usize, channel_len: usize) -> (usize, usize, usize) pub struct Lcm { socket: UdpSocket, multicast_addr: SocketAddrV4, - reassembly: Mutex>, + reassembly: Mutex, } impl Lcm { @@ -106,7 +161,7 @@ impl Lcm { Ok(Self { socket, multicast_addr: SocketAddrV4::new(opts.multicast_group, opts.port), - reassembly: Mutex::new(HashMap::new()), + reassembly: Mutex::new(FragStore::new()), }) } @@ -237,12 +292,18 @@ impl Lcm { let key = (sender, seqno); let mut reassembly = self.reassembly.lock().unwrap(); - let entry = reassembly.entry(key).or_insert_with(|| FragmentBuffer { + let is_new = !reassembly.map.contains_key(&key); + reassembly.map.entry(key).or_insert_with(|| FragmentBuffer { channel: channel.clone().unwrap_or_default(), num_fragments, received: 0, data: vec![0u8; total_size], + last_update: Instant::now(), }); + if is_new { + reassembly.total_bytes = reassembly.total_bytes.saturating_add(total_size); + } + let entry = reassembly.map.get_mut(&key).unwrap(); // First fragment also sets the channel name on an existing entry if let Some(ch) = channel { @@ -252,15 +313,25 @@ impl Lcm { let end = (fragment_offset + payload.len()).min(total_size); entry.data[fragment_offset..end].copy_from_slice(&payload[..end - fragment_offset]); entry.received += 1; + entry.last_update = Instant::now(); if entry.received == entry.num_fragments { - let complete = reassembly.remove(&key).unwrap(); + let complete = reassembly.map.remove(&key).unwrap(); + reassembly.total_bytes = reassembly.total_bytes.saturating_sub(complete.data.len()); return Ok(Some(ReceivedMessage { channel: complete.channel, data: complete.data, })); } + // Apply the LRU eviction caps. Done on every fragment so a single + // long-lived dropped-fragment entry doesn't persist past ~1000 + // subsequent messages or 16 MB of accumulated incomplete payloads. + // Mirrors lcm_frag_buf_store_add in upstream LCM's udpm_util.c. + if is_new { + reassembly.enforce_caps(); + } + Ok(None) } @@ -359,4 +430,91 @@ mod tests { buf[SHORT_HEADER_SIZE..SHORT_HEADER_SIZE + 4].copy_from_slice(b"CHAN"); assert!(Lcm::decode_small(&buf).unwrap().is_none()); } + + // --- FragStore (reassembly map) tests --- + + fn make_buf(data_size: usize) -> FragmentBuffer { + FragmentBuffer { + channel: String::new(), + num_fragments: 2, + received: 0, + data: vec![0u8; data_size], + last_update: Instant::now(), + } + } + + #[test] + fn frag_store_evicts_when_over_byte_cap() { + // Insert entries totaling more than MAX_FRAG_BUF_TOTAL_BYTES, + // verify the oldest are evicted to bring total under the cap. + let mut store = FragStore::new(); + let big = MAX_FRAG_BUF_TOTAL_BYTES / 2 + 1; // each entry > half the cap + for i in 0..3 { + let buf = make_buf(big); + store.total_bytes += buf.data.len(); + // Use a u32 seqno; sender is the same for all to test eviction + // happens regardless of sender identity. + store.map.insert((SocketAddr::from(([127, 0, 0, 1], 0)), i), buf); + // Stagger update timestamps so LRU has a stable order. + std::thread::sleep(std::time::Duration::from_millis(1)); + store.enforce_caps(); + } + assert!( + store.total_bytes <= MAX_FRAG_BUF_TOTAL_BYTES, + "total {} must be <= cap {}", + store.total_bytes, + MAX_FRAG_BUF_TOTAL_BYTES, + ); + assert!( + store.map.len() < 3, + "at least one entry must have been evicted; got {} entries", + store.map.len(), + ); + } + + #[test] + fn frag_store_evicts_when_over_entry_cap() { + // Insert MAX_NUM_FRAG_BUFS + 5 small entries, verify count is + // bounded at MAX_NUM_FRAG_BUFS after enforce_caps runs. + let mut store = FragStore::new(); + for i in 0..(MAX_NUM_FRAG_BUFS as u32 + 5) { + let buf = make_buf(8); + store.total_bytes += buf.data.len(); + store.map.insert((SocketAddr::from(([127, 0, 0, 1], 0)), i), buf); + store.enforce_caps(); + } + assert_eq!( + store.map.len(), + MAX_NUM_FRAG_BUFS, + "entry count must be exactly the cap after enforce_caps", + ); + } + + #[test] + fn frag_store_evict_lru_picks_oldest() { + // The eviction picks the entry whose `last_update` is oldest, not + // (e.g.) by insertion order or by hash position. + let mut store = FragStore::new(); + let mut older = make_buf(8); + older.last_update = Instant::now(); + // Make a slightly newer one + std::thread::sleep(std::time::Duration::from_millis(2)); + let mut newer = make_buf(8); + newer.last_update = Instant::now(); + let older_key = (SocketAddr::from(([127, 0, 0, 1], 0)), 1u32); + let newer_key = (SocketAddr::from(([127, 0, 0, 1], 0)), 2u32); + store.map.insert(older_key, older); + store.map.insert(newer_key, newer); + store.total_bytes = 16; + assert!(store.evict_lru()); + assert!( + !store.map.contains_key(&older_key), + "older entry should have been evicted; map: {:?}", + store.map.keys().collect::>(), + ); + assert!( + store.map.contains_key(&newer_key), + "newer entry should still be present", + ); + } } From 00ca8b83a0f5d1c6bbbc1b587652d6ad4595815e Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 10:02:28 -0700 Subject: [PATCH 2/4] feat(rust-lcm): add recv_buf_size to LcmOptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an optional `recv_buf_size: Option` field to `LcmOptions` that calls `socket2::set_recv_buffer_size` on the UDP receive socket before binding. None (default) leaves the kernel default in place (typically `net.core.rmem_default` on Linux after `BufferConfiguratorLinux` raises it to 64 MiB). Useful when the LCM client wants a deterministic, larger-than-default buffer for high-rate publishers of fragmented messages — e.g. ~500 KB PointCloud2 over LCM at 10 Hz. The matching change in `dimos/protocol/service/lcmservice.py` passes `recv_buf_size` via the URL query string for the Python LCM client; the Rust port needs an equivalent path. On Linux the kernel doubles the requested value, so requesting 64 MiB yields a usable 128 MiB. socket2 silently clamps to `rmem_max`. This is a non-breaking addition (defaults to `None`); existing `LcmOptions::default()` callers see no behavioral change. --- tools/rust/lcm/src/transport.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tools/rust/lcm/src/transport.rs b/tools/rust/lcm/src/transport.rs index 5766ade..20c3057 100644 --- a/tools/rust/lcm/src/transport.rs +++ b/tools/rust/lcm/src/transport.rs @@ -94,6 +94,12 @@ pub struct LcmOptions { pub ttl: u32, /// Network interface to bind to (default: any). pub interface: Ipv4Addr, + /// Receive socket buffer size in bytes. None = leave at OS default + /// (`net.core.rmem_default` on Linux). For high-rate publishers of + /// large fragmented messages (~500 KB PointCloud2 at 10 Hz), the + /// kernel default may be far too small. Set this to 16-64 MB to + /// match the matching `BufferConfiguratorLinux` sysctl value. + pub recv_buf_size: Option, } impl Default for LcmOptions { @@ -103,6 +109,7 @@ impl Default for LcmOptions { port: DEFAULT_PORT, ttl: 1, interface: Ipv4Addr::UNSPECIFIED, + recv_buf_size: None, } } } @@ -147,6 +154,17 @@ impl Lcm { s2.set_reuse_address(true)?; #[cfg(not(target_os = "windows"))] s2.set_reuse_port(true)?; + if let Some(size) = opts.recv_buf_size { + // socket2's set_recv_buffer_size silently clamps to + // net.core.rmem_max on Linux. Failing the call is non-fatal; + // log via stderr and continue with whatever the OS gave us. + if let Err(err) = s2.set_recv_buffer_size(size) { + eprintln!( + "lcm: failed to set SO_RCVBUF={}: {} (continuing with OS default)", + size, err, + ); + } + } let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, opts.port); s2.bind(&bind_addr.into())?; From cc22e907942a00f974d3757727f58633dbddcbe6 Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 12:45:05 -0700 Subject: [PATCH 3/4] chore(rust-lcm): add burst_test example for characterizing transport limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Usage: cargo run --release --example burst_test sender cargo run --release --example burst_test receiver cargo run --release --example burst_test slow_receiver Used during the dimos PGO investigation to bisect "is this LCM or the application layer?". Findings on this host (Linux, lo interface, SO_RCVBUF 128 MB after the URL-param fix): 500 KB × 10 Hz × 10 s — 0% drops (4.7 MB/s benchmark rate) 500 KB × 89 Hz × 3 s — 0% drops (42 MB/s) 500 KB × 10 Hz × 100 s with 200 ms slow handler — 27.6% drops (sustained back-pressure) 500 KB × 100 msgs × 0 ms — 0% drops (213 MB/s peak burst) Confirms LCM drops only under sustained handler back-pressure where cumulative backlog exceeds the SO_RCVBUF. Short bursts and well-paced streams within the kernel buffer's capacity transmit cleanly. Useful as a regression / hardware-characterization tool for future hosts and as a unit-style test for LCM library changes. --- tools/rust/lcm/examples/burst_test.rs | 249 ++++++++++++++++++++++++++ 1 file changed, 249 insertions(+) create mode 100644 tools/rust/lcm/examples/burst_test.rs diff --git a/tools/rust/lcm/examples/burst_test.rs b/tools/rust/lcm/examples/burst_test.rs new file mode 100644 index 0000000..ba0606b --- /dev/null +++ b/tools/rust/lcm/examples/burst_test.rs @@ -0,0 +1,249 @@ +//! LCM burst test — characterizes how many large messages LCM can +//! reliably deliver under various burst conditions on loopback. +//! +//! Run with two processes: +//! $ cargo run --release --example burst_test -- receiver +//! $ cargo run --release --example burst_test -- sender +//! +//! Or single-process mode (sender + receiver in the same tokio runtime): +//! $ cargo run --release --example burst_test -- both +//! +//! Reports: messages received vs sent, drop rate, per-message latency stats. + +use dimos_lcm::{Lcm, LcmOptions}; +use std::env; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::time::sleep; + +const TOPIC: &str = "burst"; + +fn make_payload(seq: u32, total_size: usize) -> Vec { + let mut buf = vec![0u8; total_size]; + buf[0..4].copy_from_slice(&seq.to_le_bytes()); + buf +} + +fn parse_seq(buf: &[u8]) -> Option { + if buf.len() < 4 { + return None; + } + Some(u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]])) +} + +async fn run_slow_receiver(expected: u32, duration: Duration, handler_ms: u64) -> (u32, Vec) { + // Same as run_receiver but the recv task sleeps `handler_ms` after each + // message to simulate a slow application handler. Used to test whether + // sender-fast / receiver-slow scenarios cause LCM-level drops. + let mut opts = LcmOptions::default(); + opts.recv_buf_size = Some(64 * 1024 * 1024); + let lcm = Lcm::with_options(opts).await.expect("create lcm"); + eprintln!("[receiver] subscribed (handler_ms={}), waiting for messages...", handler_ms); + let received = Arc::new(AtomicU32::new(0)); + let received_clone = received.clone(); + let recv_task = tokio::spawn(async move { + let mut seqs = Vec::with_capacity(expected as usize); + loop { + match tokio::time::timeout(Duration::from_secs(5), lcm.recv()).await { + Ok(Ok(msg)) => { + if msg.channel != TOPIC { + continue; + } + if let Some(seq) = parse_seq(&msg.data) { + seqs.push(seq); + let n = received_clone.fetch_add(1, Ordering::Relaxed) + 1; + if n % 25 == 0 || n <= 3 { + eprintln!("[receiver] got seq={} (total={})", seq, n); + } + if handler_ms > 0 { + tokio::time::sleep(Duration::from_millis(handler_ms)).await; + } + } + } + Ok(Err(e)) => { + eprintln!("[receiver] recv error: {e}"); + break; + } + Err(_) => { + eprintln!("[receiver] 5s idle timeout, ending"); + break; + } + } + } + seqs + }); + + let start = Instant::now(); + while start.elapsed() < duration && received.load(Ordering::Relaxed) < expected { + sleep(Duration::from_millis(50)).await; + } + let seqs = recv_task.await.unwrap(); + let count = received.load(Ordering::Relaxed); + (count, seqs) +} + +async fn run_receiver(expected: u32, duration: Duration) -> (u32, Vec) { + // Bump SO_RCVBUF to match what production dimos uses + let mut opts = LcmOptions::default(); + opts.recv_buf_size = Some(64 * 1024 * 1024); + let lcm = Lcm::with_options(opts).await.expect("create lcm"); + eprintln!("[receiver] subscribed, waiting for messages..."); + let received = Arc::new(AtomicU32::new(0)); + let received_clone = received.clone(); + let recv_task = tokio::spawn(async move { + let mut seqs = Vec::with_capacity(expected as usize); + loop { + match tokio::time::timeout(Duration::from_secs(2), lcm.recv()).await { + Ok(Ok(msg)) => { + if msg.channel != TOPIC { + eprintln!("[receiver] skipping channel '{}'", msg.channel); + continue; + } + if let Some(seq) = parse_seq(&msg.data) { + seqs.push(seq); + let n = received_clone.fetch_add(1, Ordering::Relaxed) + 1; + if n % 25 == 0 || n <= 3 { + eprintln!("[receiver] got seq={} (total={})", seq, n); + } + } + } + Ok(Err(e)) => { + eprintln!("[receiver] recv error: {e}"); + break; + } + Err(_) => { + eprintln!("[receiver] 2s idle timeout, ending"); + break; + } + } + } + seqs + }); + + // Wait for either expected count or duration + let start = Instant::now(); + while start.elapsed() < duration && received.load(Ordering::Relaxed) < expected { + sleep(Duration::from_millis(50)).await; + } + // Give the recv_task its 2s timeout to wind down + let seqs = recv_task.await.unwrap(); + let count = received.load(Ordering::Relaxed); + (count, seqs) +} + +async fn run_sender(msg_size: usize, count: u32, interval_ms: u64) { + let lcm = Lcm::new().await.expect("create lcm"); + // Brief warmup pause so any pre-existing subscriber is in its receive loop + sleep(Duration::from_millis(200)).await; + let start = Instant::now(); + for seq in 0..count { + let payload = make_payload(seq, msg_size); + lcm.publish(TOPIC, &payload).await.expect("publish"); + if interval_ms > 0 { + sleep(Duration::from_millis(interval_ms)).await; + } + } + let elapsed = start.elapsed(); + eprintln!( + "[sender] sent {} messages of {} bytes in {:.3} s ({:.1} msg/s, {:.2} MB/s)", + count, + msg_size, + elapsed.as_secs_f64(), + count as f64 / elapsed.as_secs_f64(), + (count as f64 * msg_size as f64) / 1024.0 / 1024.0 / elapsed.as_secs_f64(), + ); +} + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("usage: burst_test [msg_size] [count] [interval_ms]"); + std::process::exit(1); + } + let mode = &args[1]; + let msg_size: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(500_000); + let count: u32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(1000); + let interval_ms: u64 = args.get(4).and_then(|s| s.parse().ok()).unwrap_or(100); + + match mode.as_str() { + "sender" => run_sender(msg_size, count, interval_ms).await, + "slow_receiver" => { + // args: slow_receiver + let recv_count: u32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(100); + let handler_ms: u64 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(100); + let max_duration = Duration::from_secs(120); + let (received, seqs) = run_slow_receiver(recv_count, max_duration, handler_ms).await; + let unique: std::collections::HashSet<_> = seqs.iter().copied().collect(); + println!( + "received={} unique={} expected={} drop_rate={:.1}%", + received, + unique.len(), + recv_count, + 100.0 * (recv_count as f64 - unique.len() as f64) / recv_count as f64, + ); + } + "receiver" => { + let max_duration = Duration::from_secs(120); + let (received, seqs) = run_receiver(count, max_duration).await; + let unique: std::collections::HashSet<_> = seqs.iter().copied().collect(); + println!( + "received={} unique={} expected={} drop_rate={:.1}%", + received, + unique.len(), + count, + 100.0 * (count as f64 - unique.len() as f64) / count as f64, + ); + // Print gaps in sequence for diagnostic + let mut sorted: Vec = unique.into_iter().collect(); + sorted.sort(); + let mut gaps = Vec::new(); + let mut prev = -1i64; + for s in &sorted { + let curr = *s as i64; + if curr > prev + 1 && prev != -1 { + gaps.push((prev + 1, curr - 1)); + } + prev = curr; + } + if !gaps.is_empty() { + let total_missing: i64 = gaps.iter().map(|(a, b)| b - a + 1).sum(); + println!( + "missing-seq gaps: {} ranges, {} total missing seqs", + gaps.len(), + total_missing, + ); + for (a, b) in gaps.iter().take(10) { + if a == b { + println!(" seq {} missing", a); + } else { + println!(" seqs [{}, {}] missing ({} seqs)", a, b, b - a + 1); + } + } + } + } + "both" => { + // Spawn receiver first, give it 100ms to subscribe, then start sender + let recv_count = count; + let recv_task = tokio::spawn(async move { + run_receiver(recv_count, Duration::from_secs(120)).await + }); + sleep(Duration::from_millis(300)).await; + run_sender(msg_size, count, interval_ms).await; + let (received, seqs) = recv_task.await.unwrap(); + let unique: std::collections::HashSet<_> = seqs.iter().copied().collect(); + println!( + "received={} unique={} expected={} drop_rate={:.1}%", + received, + unique.len(), + count, + 100.0 * (count as f64 - unique.len() as f64) / count as f64, + ); + } + _ => { + eprintln!("unknown mode: {}", mode); + std::process::exit(1); + } + } +} From 3555e97704ec4c727d96a7ac0769d5735bb44feb Mon Sep 17 00:00:00 2001 From: Jeff Hykin Date: Wed, 20 May 2026 13:29:34 -0700 Subject: [PATCH 4/4] Simplify comments --- tools/rust/lcm/examples/burst_test.rs | 23 ++------ tools/rust/lcm/src/transport.rs | 83 +++++---------------------- 2 files changed, 19 insertions(+), 87 deletions(-) diff --git a/tools/rust/lcm/examples/burst_test.rs b/tools/rust/lcm/examples/burst_test.rs index ba0606b..7f8483d 100644 --- a/tools/rust/lcm/examples/burst_test.rs +++ b/tools/rust/lcm/examples/burst_test.rs @@ -1,14 +1,7 @@ -//! LCM burst test — characterizes how many large messages LCM can -//! reliably deliver under various burst conditions on loopback. -//! -//! Run with two processes: -//! $ cargo run --release --example burst_test -- receiver -//! $ cargo run --release --example burst_test -- sender -//! -//! Or single-process mode (sender + receiver in the same tokio runtime): -//! $ cargo run --release --example burst_test -- both -//! -//! Reports: messages received vs sent, drop rate, per-message latency stats. +//! LCM throughput test. Run two processes: +//! $ cargo run --release --example burst_test receiver +//! $ cargo run --release --example burst_test sender +//! Or run both in one process via the `both` or `slow_receiver` mode. use dimos_lcm::{Lcm, LcmOptions}; use std::env; @@ -33,9 +26,6 @@ fn parse_seq(buf: &[u8]) -> Option { } async fn run_slow_receiver(expected: u32, duration: Duration, handler_ms: u64) -> (u32, Vec) { - // Same as run_receiver but the recv task sleeps `handler_ms` after each - // message to simulate a slow application handler. Used to test whether - // sender-fast / receiver-slow scenarios cause LCM-level drops. let mut opts = LcmOptions::default(); opts.recv_buf_size = Some(64 * 1024 * 1024); let lcm = Lcm::with_options(opts).await.expect("create lcm"); @@ -84,7 +74,6 @@ async fn run_slow_receiver(expected: u32, duration: Duration, handler_ms: u64) - } async fn run_receiver(expected: u32, duration: Duration) -> (u32, Vec) { - // Bump SO_RCVBUF to match what production dimos uses let mut opts = LcmOptions::default(); opts.recv_buf_size = Some(64 * 1024 * 1024); let lcm = Lcm::with_options(opts).await.expect("create lcm"); @@ -134,7 +123,6 @@ async fn run_receiver(expected: u32, duration: Duration) -> (u32, Vec) { async fn run_sender(msg_size: usize, count: u32, interval_ms: u64) { let lcm = Lcm::new().await.expect("create lcm"); - // Brief warmup pause so any pre-existing subscriber is in its receive loop sleep(Duration::from_millis(200)).await; let start = Instant::now(); for seq in 0..count { @@ -170,7 +158,6 @@ async fn main() { match mode.as_str() { "sender" => run_sender(msg_size, count, interval_ms).await, "slow_receiver" => { - // args: slow_receiver let recv_count: u32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(100); let handler_ms: u64 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(100); let max_duration = Duration::from_secs(120); @@ -195,7 +182,6 @@ async fn main() { count, 100.0 * (count as f64 - unique.len() as f64) / count as f64, ); - // Print gaps in sequence for diagnostic let mut sorted: Vec = unique.into_iter().collect(); sorted.sort(); let mut gaps = Vec::new(); @@ -224,7 +210,6 @@ async fn main() { } } "both" => { - // Spawn receiver first, give it 100ms to subscribe, then start sender let recv_count = count; let recv_task = tokio::spawn(async move { run_receiver(recv_count, Duration::from_secs(120)).await diff --git a/tools/rust/lcm/src/transport.rs b/tools/rust/lcm/src/transport.rs index 20c3057..25d5b1c 100644 --- a/tools/rust/lcm/src/transport.rs +++ b/tools/rust/lcm/src/transport.rs @@ -14,15 +14,8 @@ const SHORT_HEADER_SIZE: usize = 8; const FRAGMENT_HEADER_SIZE: usize = 20; const MAX_DATAGRAM_SIZE: usize = 65507; -// Reassembly buffer caps. Mirrors the C LCM library's -// MAX_FRAG_BUF_TOTAL_SIZE / MAX_NUM_FRAG_BUFS in `udpm_util.h`. When either -// limit is exceeded, the least-recently-updated entry is evicted to make -// room — preventing the reassembly map from growing unbounded when fragments -// are dropped at the UDP layer (any of N datagrams lost = the other N-1 -// stuck in the map). Without eviction, a single dropped packet leaks ~500 KB -// forever, and over a sustained large-message stream the map fills, lock -// contention degrades the receive thread, and drop rates cascade. -const MAX_FRAG_BUF_TOTAL_BYTES: usize = 16 * 1024 * 1024; // 16 MiB +// Reassembly buffer caps; matches C LCM's `udpm_util.h`. +const MAX_FRAG_BUF_TOTAL_BYTES: usize = 16 * 1024 * 1024; const MAX_NUM_FRAG_BUFS: usize = 1000; /// Default LCM multicast group address. @@ -38,15 +31,12 @@ struct FragmentBuffer { num_fragments: u16, received: u16, data: Vec, - /// Monotonic time of the last fragment arrival on this entry. Used to - /// pick the LRU entry for eviction when the reassembly map fills. last_update: Instant, } -/// Container for in-flight fragment buffers, with LRU eviction. Wraps a -/// `HashMap` keyed by `(sender, seqno)` and tracks total buffered bytes so -/// we can enforce both an entry-count cap and a memory cap. Mirrors the -/// behavior of `lcm_frag_buf_store` in upstream LCM's `udpm_util.{c,h}`. +/// In-flight fragment buffers with LRU eviction. Same idea as C LCM's +/// `lcm_frag_buf_store`: cap total bytes and entry count, evict oldest +/// when over. struct FragStore { map: HashMap<(SocketAddr, u32), FragmentBuffer>, total_bytes: usize, @@ -57,8 +47,6 @@ impl FragStore { Self { map: HashMap::new(), total_bytes: 0 } } - /// Evict the single least-recently-updated entry, returning true if one - /// was found. Caller loops until both caps are satisfied. fn evict_lru(&mut self) -> bool { let lru_key = self.map.iter() .min_by_key(|(_, fb)| fb.last_update) @@ -72,9 +60,6 @@ impl FragStore { false } - /// Ensure both caps are honored. Eviction continues until the store - /// fits within `max_total_bytes` AND `max_entries`. Called after each - /// new entry insert. fn enforce_caps(&mut self) { while (self.total_bytes > MAX_FRAG_BUF_TOTAL_BYTES || self.map.len() > MAX_NUM_FRAG_BUFS) @@ -94,11 +79,8 @@ pub struct LcmOptions { pub ttl: u32, /// Network interface to bind to (default: any). pub interface: Ipv4Addr, - /// Receive socket buffer size in bytes. None = leave at OS default - /// (`net.core.rmem_default` on Linux). For high-rate publishers of - /// large fragmented messages (~500 KB PointCloud2 at 10 Hz), the - /// kernel default may be far too small. Set this to 16-64 MB to - /// match the matching `BufferConfiguratorLinux` sysctl value. + /// SO_RCVBUF in bytes. None = OS default. Set to 16-64 MB for + /// sustained large fragmented messages. pub recv_buf_size: Option, } @@ -155,14 +137,8 @@ impl Lcm { #[cfg(not(target_os = "windows"))] s2.set_reuse_port(true)?; if let Some(size) = opts.recv_buf_size { - // socket2's set_recv_buffer_size silently clamps to - // net.core.rmem_max on Linux. Failing the call is non-fatal; - // log via stderr and continue with whatever the OS gave us. if let Err(err) = s2.set_recv_buffer_size(size) { - eprintln!( - "lcm: failed to set SO_RCVBUF={}: {} (continuing with OS default)", - size, err, - ); + eprintln!("lcm: failed to set SO_RCVBUF={}: {}", size, err); } } @@ -342,10 +318,6 @@ impl Lcm { })); } - // Apply the LRU eviction caps. Done on every fragment so a single - // long-lived dropped-fragment entry doesn't persist past ~1000 - // subsequent messages or 16 MB of accumulated incomplete payloads. - // Mirrors lcm_frag_buf_store_add in upstream LCM's udpm_util.c. if is_new { reassembly.enforce_caps(); } @@ -449,7 +421,7 @@ mod tests { assert!(Lcm::decode_small(&buf).unwrap().is_none()); } - // --- FragStore (reassembly map) tests --- + // FragStore tests fn make_buf(data_size: usize) -> FragmentBuffer { FragmentBuffer { @@ -463,37 +435,21 @@ mod tests { #[test] fn frag_store_evicts_when_over_byte_cap() { - // Insert entries totaling more than MAX_FRAG_BUF_TOTAL_BYTES, - // verify the oldest are evicted to bring total under the cap. let mut store = FragStore::new(); - let big = MAX_FRAG_BUF_TOTAL_BYTES / 2 + 1; // each entry > half the cap + let big = MAX_FRAG_BUF_TOTAL_BYTES / 2 + 1; for i in 0..3 { let buf = make_buf(big); store.total_bytes += buf.data.len(); - // Use a u32 seqno; sender is the same for all to test eviction - // happens regardless of sender identity. store.map.insert((SocketAddr::from(([127, 0, 0, 1], 0)), i), buf); - // Stagger update timestamps so LRU has a stable order. std::thread::sleep(std::time::Duration::from_millis(1)); store.enforce_caps(); } - assert!( - store.total_bytes <= MAX_FRAG_BUF_TOTAL_BYTES, - "total {} must be <= cap {}", - store.total_bytes, - MAX_FRAG_BUF_TOTAL_BYTES, - ); - assert!( - store.map.len() < 3, - "at least one entry must have been evicted; got {} entries", - store.map.len(), - ); + assert!(store.total_bytes <= MAX_FRAG_BUF_TOTAL_BYTES); + assert!(store.map.len() < 3); } #[test] fn frag_store_evicts_when_over_entry_cap() { - // Insert MAX_NUM_FRAG_BUFS + 5 small entries, verify count is - // bounded at MAX_NUM_FRAG_BUFS after enforce_caps runs. let mut store = FragStore::new(); for i in 0..(MAX_NUM_FRAG_BUFS as u32 + 5) { let buf = make_buf(8); @@ -501,24 +457,15 @@ mod tests { store.map.insert((SocketAddr::from(([127, 0, 0, 1], 0)), i), buf); store.enforce_caps(); } - assert_eq!( - store.map.len(), - MAX_NUM_FRAG_BUFS, - "entry count must be exactly the cap after enforce_caps", - ); + assert_eq!(store.map.len(), MAX_NUM_FRAG_BUFS); } #[test] fn frag_store_evict_lru_picks_oldest() { - // The eviction picks the entry whose `last_update` is oldest, not - // (e.g.) by insertion order or by hash position. let mut store = FragStore::new(); - let mut older = make_buf(8); - older.last_update = Instant::now(); - // Make a slightly newer one + let older = make_buf(8); std::thread::sleep(std::time::Duration::from_millis(2)); - let mut newer = make_buf(8); - newer.last_update = Instant::now(); + let newer = make_buf(8); let older_key = (SocketAddr::from(([127, 0, 0, 1], 0)), 1u32); let newer_key = (SocketAddr::from(([127, 0, 0, 1], 0)), 2u32); store.map.insert(older_key, older);