diff --git a/tools/rust/lcm/examples/burst_test.rs b/tools/rust/lcm/examples/burst_test.rs new file mode 100644 index 0000000..7f8483d --- /dev/null +++ b/tools/rust/lcm/examples/burst_test.rs @@ -0,0 +1,234 @@ +//! LCM throughput test. Run two processes: +//! $ cargo run --release --example burst_test receiver +//! $ cargo run --release --example burst_test sender +//! Or run both in one process via the `both` or `slow_receiver` mode. + +use dimos_lcm::{Lcm, LcmOptions}; +use std::env; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::time::sleep; + +const TOPIC: &str = "burst"; + +fn make_payload(seq: u32, total_size: usize) -> Vec { + let mut buf = vec![0u8; total_size]; + buf[0..4].copy_from_slice(&seq.to_le_bytes()); + buf +} + +fn parse_seq(buf: &[u8]) -> Option { + if buf.len() < 4 { + return None; + } + Some(u32::from_le_bytes([buf[0], buf[1], buf[2], buf[3]])) +} + +async fn run_slow_receiver(expected: u32, duration: Duration, handler_ms: u64) -> (u32, Vec) { + let mut opts = LcmOptions::default(); + opts.recv_buf_size = Some(64 * 1024 * 1024); + let lcm = Lcm::with_options(opts).await.expect("create lcm"); + eprintln!("[receiver] subscribed (handler_ms={}), waiting for messages...", handler_ms); + let received = Arc::new(AtomicU32::new(0)); + let received_clone = received.clone(); + let recv_task = tokio::spawn(async move { + let mut seqs = Vec::with_capacity(expected as usize); + loop { + match tokio::time::timeout(Duration::from_secs(5), lcm.recv()).await { + Ok(Ok(msg)) => { + if msg.channel != TOPIC { + continue; + } + if let Some(seq) = parse_seq(&msg.data) { + seqs.push(seq); + let n = received_clone.fetch_add(1, Ordering::Relaxed) + 1; + if n % 25 == 0 || n <= 3 { + eprintln!("[receiver] got seq={} (total={})", seq, n); + } + if handler_ms > 0 { + tokio::time::sleep(Duration::from_millis(handler_ms)).await; + } + } + } + Ok(Err(e)) => { + eprintln!("[receiver] recv error: {e}"); + break; + } + Err(_) => { + eprintln!("[receiver] 5s idle timeout, ending"); + break; + } + } + } + seqs + }); + + let start = Instant::now(); + while start.elapsed() < duration && received.load(Ordering::Relaxed) < expected { + sleep(Duration::from_millis(50)).await; + } + let seqs = recv_task.await.unwrap(); + let count = received.load(Ordering::Relaxed); + (count, seqs) +} + +async fn run_receiver(expected: u32, duration: Duration) -> (u32, Vec) { + let mut opts = LcmOptions::default(); + opts.recv_buf_size = Some(64 * 1024 * 1024); + let lcm = Lcm::with_options(opts).await.expect("create lcm"); + eprintln!("[receiver] subscribed, waiting for messages..."); + let received = Arc::new(AtomicU32::new(0)); + let received_clone = received.clone(); + let recv_task = tokio::spawn(async move { + let mut seqs = Vec::with_capacity(expected as usize); + loop { + match tokio::time::timeout(Duration::from_secs(2), lcm.recv()).await { + Ok(Ok(msg)) => { + if msg.channel != TOPIC { + eprintln!("[receiver] skipping channel '{}'", msg.channel); + continue; + } + if let Some(seq) = parse_seq(&msg.data) { + seqs.push(seq); + let n = received_clone.fetch_add(1, Ordering::Relaxed) + 1; + if n % 25 == 0 || n <= 3 { + eprintln!("[receiver] got seq={} (total={})", seq, n); + } + } + } + Ok(Err(e)) => { + eprintln!("[receiver] recv error: {e}"); + break; + } + Err(_) => { + eprintln!("[receiver] 2s idle timeout, ending"); + break; + } + } + } + seqs + }); + + // Wait for either expected count or duration + let start = Instant::now(); + while start.elapsed() < duration && received.load(Ordering::Relaxed) < expected { + sleep(Duration::from_millis(50)).await; + } + // Give the recv_task its 2s timeout to wind down + let seqs = recv_task.await.unwrap(); + let count = received.load(Ordering::Relaxed); + (count, seqs) +} + +async fn run_sender(msg_size: usize, count: u32, interval_ms: u64) { + let lcm = Lcm::new().await.expect("create lcm"); + sleep(Duration::from_millis(200)).await; + let start = Instant::now(); + for seq in 0..count { + let payload = make_payload(seq, msg_size); + lcm.publish(TOPIC, &payload).await.expect("publish"); + if interval_ms > 0 { + sleep(Duration::from_millis(interval_ms)).await; + } + } + let elapsed = start.elapsed(); + eprintln!( + "[sender] sent {} messages of {} bytes in {:.3} s ({:.1} msg/s, {:.2} MB/s)", + count, + msg_size, + elapsed.as_secs_f64(), + count as f64 / elapsed.as_secs_f64(), + (count as f64 * msg_size as f64) / 1024.0 / 1024.0 / elapsed.as_secs_f64(), + ); +} + +#[tokio::main] +async fn main() { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("usage: burst_test [msg_size] [count] [interval_ms]"); + std::process::exit(1); + } + let mode = &args[1]; + let msg_size: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(500_000); + let count: u32 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(1000); + let interval_ms: u64 = args.get(4).and_then(|s| s.parse().ok()).unwrap_or(100); + + match mode.as_str() { + "sender" => run_sender(msg_size, count, interval_ms).await, + "slow_receiver" => { + let recv_count: u32 = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(100); + let handler_ms: u64 = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(100); + let max_duration = Duration::from_secs(120); + let (received, seqs) = run_slow_receiver(recv_count, max_duration, handler_ms).await; + let unique: std::collections::HashSet<_> = seqs.iter().copied().collect(); + println!( + "received={} unique={} expected={} drop_rate={:.1}%", + received, + unique.len(), + recv_count, + 100.0 * (recv_count as f64 - unique.len() as f64) / recv_count as f64, + ); + } + "receiver" => { + let max_duration = Duration::from_secs(120); + let (received, seqs) = run_receiver(count, max_duration).await; + let unique: std::collections::HashSet<_> = seqs.iter().copied().collect(); + println!( + "received={} unique={} expected={} drop_rate={:.1}%", + received, + unique.len(), + count, + 100.0 * (count as f64 - unique.len() as f64) / count as f64, + ); + let mut sorted: Vec = unique.into_iter().collect(); + sorted.sort(); + let mut gaps = Vec::new(); + let mut prev = -1i64; + for s in &sorted { + let curr = *s as i64; + if curr > prev + 1 && prev != -1 { + gaps.push((prev + 1, curr - 1)); + } + prev = curr; + } + if !gaps.is_empty() { + let total_missing: i64 = gaps.iter().map(|(a, b)| b - a + 1).sum(); + println!( + "missing-seq gaps: {} ranges, {} total missing seqs", + gaps.len(), + total_missing, + ); + for (a, b) in gaps.iter().take(10) { + if a == b { + println!(" seq {} missing", a); + } else { + println!(" seqs [{}, {}] missing ({} seqs)", a, b, b - a + 1); + } + } + } + } + "both" => { + let recv_count = count; + let recv_task = tokio::spawn(async move { + run_receiver(recv_count, Duration::from_secs(120)).await + }); + sleep(Duration::from_millis(300)).await; + run_sender(msg_size, count, interval_ms).await; + let (received, seqs) = recv_task.await.unwrap(); + let unique: std::collections::HashSet<_> = seqs.iter().copied().collect(); + println!( + "received={} unique={} expected={} drop_rate={:.1}%", + received, + unique.len(), + count, + 100.0 * (count as f64 - unique.len() as f64) / count as f64, + ); + } + _ => { + eprintln!("unknown mode: {}", mode); + std::process::exit(1); + } + } +} diff --git a/tools/rust/lcm/src/transport.rs b/tools/rust/lcm/src/transport.rs index ef74aa9..25d5b1c 100644 --- a/tools/rust/lcm/src/transport.rs +++ b/tools/rust/lcm/src/transport.rs @@ -6,6 +6,7 @@ use std::io; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::Mutex; use std::sync::atomic::{AtomicU32, Ordering}; +use std::time::Instant; const MAGIC_SHORT: u32 = 0x4c433032; // "LC02" const MAGIC_LONG: u32 = 0x4c433033; // "LC03" @@ -13,6 +14,10 @@ const SHORT_HEADER_SIZE: usize = 8; const FRAGMENT_HEADER_SIZE: usize = 20; const MAX_DATAGRAM_SIZE: usize = 65507; +// Reassembly buffer caps; matches C LCM's `udpm_util.h`. +const MAX_FRAG_BUF_TOTAL_BYTES: usize = 16 * 1024 * 1024; +const MAX_NUM_FRAG_BUFS: usize = 1000; + /// Default LCM multicast group address. pub const DEFAULT_MULTICAST_GROUP: Ipv4Addr = Ipv4Addr::new(239, 255, 76, 67); /// Default LCM multicast port. @@ -26,6 +31,41 @@ struct FragmentBuffer { num_fragments: u16, received: u16, data: Vec, + last_update: Instant, +} + +/// In-flight fragment buffers with LRU eviction. Same idea as C LCM's +/// `lcm_frag_buf_store`: cap total bytes and entry count, evict oldest +/// when over. +struct FragStore { + map: HashMap<(SocketAddr, u32), FragmentBuffer>, + total_bytes: usize, +} + +impl FragStore { + fn new() -> Self { + Self { map: HashMap::new(), total_bytes: 0 } + } + + fn evict_lru(&mut self) -> bool { + let lru_key = self.map.iter() + .min_by_key(|(_, fb)| fb.last_update) + .map(|(key, _)| *key); + if let Some(key) = lru_key { + if let Some(fb) = self.map.remove(&key) { + self.total_bytes = self.total_bytes.saturating_sub(fb.data.len()); + return true; + } + } + false + } + + fn enforce_caps(&mut self) { + while (self.total_bytes > MAX_FRAG_BUF_TOTAL_BYTES + || self.map.len() > MAX_NUM_FRAG_BUFS) + && self.evict_lru() + {} + } } /// Configuration for an LCM transport instance. @@ -39,6 +79,9 @@ pub struct LcmOptions { pub ttl: u32, /// Network interface to bind to (default: any). pub interface: Ipv4Addr, + /// SO_RCVBUF in bytes. None = OS default. Set to 16-64 MB for + /// sustained large fragmented messages. + pub recv_buf_size: Option, } impl Default for LcmOptions { @@ -48,6 +91,7 @@ impl Default for LcmOptions { port: DEFAULT_PORT, ttl: 1, interface: Ipv4Addr::UNSPECIFIED, + recv_buf_size: None, } } } @@ -77,7 +121,7 @@ fn fragment_params(msg_size: usize, channel_len: usize) -> (usize, usize, usize) pub struct Lcm { socket: UdpSocket, multicast_addr: SocketAddrV4, - reassembly: Mutex>, + reassembly: Mutex, } impl Lcm { @@ -92,6 +136,11 @@ impl Lcm { s2.set_reuse_address(true)?; #[cfg(not(target_os = "windows"))] s2.set_reuse_port(true)?; + if let Some(size) = opts.recv_buf_size { + if let Err(err) = s2.set_recv_buffer_size(size) { + eprintln!("lcm: failed to set SO_RCVBUF={}: {}", size, err); + } + } let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, opts.port); s2.bind(&bind_addr.into())?; @@ -106,7 +155,7 @@ impl Lcm { Ok(Self { socket, multicast_addr: SocketAddrV4::new(opts.multicast_group, opts.port), - reassembly: Mutex::new(HashMap::new()), + reassembly: Mutex::new(FragStore::new()), }) } @@ -237,12 +286,18 @@ impl Lcm { let key = (sender, seqno); let mut reassembly = self.reassembly.lock().unwrap(); - let entry = reassembly.entry(key).or_insert_with(|| FragmentBuffer { + let is_new = !reassembly.map.contains_key(&key); + reassembly.map.entry(key).or_insert_with(|| FragmentBuffer { channel: channel.clone().unwrap_or_default(), num_fragments, received: 0, data: vec![0u8; total_size], + last_update: Instant::now(), }); + if is_new { + reassembly.total_bytes = reassembly.total_bytes.saturating_add(total_size); + } + let entry = reassembly.map.get_mut(&key).unwrap(); // First fragment also sets the channel name on an existing entry if let Some(ch) = channel { @@ -252,15 +307,21 @@ impl Lcm { let end = (fragment_offset + payload.len()).min(total_size); entry.data[fragment_offset..end].copy_from_slice(&payload[..end - fragment_offset]); entry.received += 1; + entry.last_update = Instant::now(); if entry.received == entry.num_fragments { - let complete = reassembly.remove(&key).unwrap(); + let complete = reassembly.map.remove(&key).unwrap(); + reassembly.total_bytes = reassembly.total_bytes.saturating_sub(complete.data.len()); return Ok(Some(ReceivedMessage { channel: complete.channel, data: complete.data, })); } + if is_new { + reassembly.enforce_caps(); + } + Ok(None) } @@ -359,4 +420,66 @@ mod tests { buf[SHORT_HEADER_SIZE..SHORT_HEADER_SIZE + 4].copy_from_slice(b"CHAN"); assert!(Lcm::decode_small(&buf).unwrap().is_none()); } + + // FragStore tests + + fn make_buf(data_size: usize) -> FragmentBuffer { + FragmentBuffer { + channel: String::new(), + num_fragments: 2, + received: 0, + data: vec![0u8; data_size], + last_update: Instant::now(), + } + } + + #[test] + fn frag_store_evicts_when_over_byte_cap() { + let mut store = FragStore::new(); + let big = MAX_FRAG_BUF_TOTAL_BYTES / 2 + 1; + for i in 0..3 { + let buf = make_buf(big); + store.total_bytes += buf.data.len(); + store.map.insert((SocketAddr::from(([127, 0, 0, 1], 0)), i), buf); + std::thread::sleep(std::time::Duration::from_millis(1)); + store.enforce_caps(); + } + assert!(store.total_bytes <= MAX_FRAG_BUF_TOTAL_BYTES); + assert!(store.map.len() < 3); + } + + #[test] + fn frag_store_evicts_when_over_entry_cap() { + let mut store = FragStore::new(); + for i in 0..(MAX_NUM_FRAG_BUFS as u32 + 5) { + let buf = make_buf(8); + store.total_bytes += buf.data.len(); + store.map.insert((SocketAddr::from(([127, 0, 0, 1], 0)), i), buf); + store.enforce_caps(); + } + assert_eq!(store.map.len(), MAX_NUM_FRAG_BUFS); + } + + #[test] + fn frag_store_evict_lru_picks_oldest() { + let mut store = FragStore::new(); + let older = make_buf(8); + std::thread::sleep(std::time::Duration::from_millis(2)); + let newer = make_buf(8); + let older_key = (SocketAddr::from(([127, 0, 0, 1], 0)), 1u32); + let newer_key = (SocketAddr::from(([127, 0, 0, 1], 0)), 2u32); + store.map.insert(older_key, older); + store.map.insert(newer_key, newer); + store.total_bytes = 16; + assert!(store.evict_lru()); + assert!( + !store.map.contains_key(&older_key), + "older entry should have been evicted; map: {:?}", + store.map.keys().collect::>(), + ); + assert!( + store.map.contains_key(&newer_key), + "newer entry should still be present", + ); + } }