From 888cedf6357039a03c32bda89af30a2e881e9f7b Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Sat, 6 Jun 2026 14:25:53 +0800 Subject: [PATCH 01/12] add red (without unit tests) --- rattan-core/src/cells/bandwidth/queue/mod.rs | 6 + rattan-core/src/cells/bandwidth/queue/red.rs | 270 +++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 rattan-core/src/cells/bandwidth/queue/red.rs diff --git a/rattan-core/src/cells/bandwidth/queue/mod.rs b/rattan-core/src/cells/bandwidth/queue/mod.rs index eecaf82..b5cb00f 100644 --- a/rattan-core/src/cells/bandwidth/queue/mod.rs +++ b/rattan-core/src/cells/bandwidth/queue/mod.rs @@ -15,11 +15,17 @@ mod codel; mod drophead; mod droptail; mod infinite; +mod red; +// Duan: mod ared; +// Duan: mod pie; pub use codel::*; pub use drophead::*; pub use droptail::*; pub use infinite::*; +pub use red::*; +// Duan: pub use ared::*; +// Duan: pub use pie::*; #[cfg(feature = "serde")] fn serde_default(t: &T) -> bool { diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs new file mode 100644 index 0000000..5dc2a9e --- /dev/null +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -0,0 +1,270 @@ +// RED Queue Implementation Reference: +// https://github.com/torvalds/linux/blob/master/include/net/red.h + +use std::collections::VecDeque; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use tokio::time::{Instant, Duration}; +use rand::random_range; +use tracing::{debug, warn}; + +#[cfg(feature = "serde")] +use super::serde_default; +use super::{BwType, PacketQueue}; +use crate::cells::Packet; + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize), serde(default))] +#[derive(Debug, Clone)] +pub struct RedQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, + pub w_q: f64, // queue weight for calculating the average queue length + pub min_th: usize, // minimum threshold of average queue length + pub max_th: usize, // maximum threshold of average queue length + pub max_p: f64, // maximum probability of dropping a packet + #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub pkt_tx_time: Duration, // typical packet tx time (us) + #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub bw_type: BwType, +} + +impl Default for RedQueueConfig { + fn default() -> Self { + Self { + packet_limit: None, + byte_limit: None, + w_q: 0.002, + min_th: 7500, // 5 * 1500 bytes + max_th: 22500, // 15 * 1500 bytes + max_p: 0.02, + pkt_tx_time: Duration::from_micros(120), // 1500 bytes * 8 / 100Mbps = 120 us + bw_type: BwType::default(), + } + } +} + +impl RedQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + w_q: f64, + min_th: usize, + max_th: usize, + max_p: f64, + pkt_tx_time: Duration, + bw_type: BwType + ) -> Self { + // Warning: The caller must ensure that the parameters are valid. + // It's recommended to do validation before calling this function, + // or we may need to return a Result instead of Self in the future. + if min_th >= max_th { + warn!("RedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", min_th, max_th); + } + if pkt_tx_time.as_micros() == 0 { + warn!("RedQueueConfig: pkt_tx_time is 0, which will cause divide-by-zero in m calculation."); + } + if !(0.0..=1.0).contains(&w_q) { + warn!("RedQueueConfig: w_q ({}) is out of expected range [0.0, 1.0]. This is an EWMA weight.", w_q); + } + if !(0.0..=1.0).contains(&max_p) { + warn!("RedQueueConfig: max_p ({}) is out of expected range [0.0, 1.0]. This is a probability.", max_p); + } + + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + w_q, + min_th, + max_th, + max_p, + pkt_tx_time, + bw_type + } + } +} + +impl

From for RedQueue

{ + fn from(config: RedQueueConfig) -> Self { + RedQueue::new(config) + } +} + +#[derive(Debug)] +pub struct RedQueue

{ + queue: VecDeque

, + config: RedQueueConfig, + now_bytes: usize, // for calculating average_queue_length + average_queue_length: f64, + count_packet: i32, // number of packets since last dropping + idle_start: Option, // start time of current idle period +} + +impl

RedQueue

{ + pub fn new(config: RedQueueConfig) -> Self { + debug!(?config, "New RedQueue"); + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + average_queue_length: 0.0, + count_packet: -1, + idle_start: None, + } + } +} + +impl

Default for RedQueue

+where + P: Packet +{ + fn default() -> Self { + Self::new(RedQueueConfig::default()) + } +} + +impl

RedQueue

+where + P: Packet, +{ + fn update_avg (&mut self) { + match self.is_empty() { + false => { + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + self.config.w_q * (self.now_bytes as f64) + }, + true => { + if let Some(idle_start) = self.idle_start { + let now = Instant::now(); + let idle_duration = now.duration_since(idle_start); + let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); + self.idle_start = Some(now); + } + } + } + } + + fn should_drop (&mut self) -> bool { + let avg = self.average_queue_length; + let min_th = self.config.min_th as f64; + let max_th = self.config.max_th as f64; + if avg >= min_th && avg < max_th { + self.count_packet += 1; + let p_b = self.config.max_p * (avg - min_th) / (max_th - min_th); + let p_a = if self.count_packet as f64 * p_b >= 1.0 { + 1.0 + } else { + p_b / (1.0 - self.count_packet as f64 * p_b) + }; + + let rand_val = random_range(0.0 .. 1.0); + if rand_val < p_a { + self.count_packet = 0; + true + } else { + false + } + } else if avg >= max_th { + self.count_packet = 0; + true + } else { + self.count_packet = -1; + false + } + } +} + +impl

PacketQueue

for RedQueue

+where + P: Packet, +{ + type Config = RedQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn is_zero_buffer(&self) -> bool { + self.config.packet_limit.is_some_and(|limit| limit == 0) + || self.config.byte_limit.is_some_and(|limit| limit == 0) + } + + fn enqueue(&mut self, packet: P) { + if self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self.config.byte_limit.is_none_or(|limit| { + self.now_bytes + packet.l3_length() + self.config.bw_type.extra_length() <= limit + }) + { + let packet_size = packet.l3_length() + self.get_extra_length(); + self.update_avg(); + match self.should_drop() { + true => { + #[cfg(test)] + tracing::trace!( + avg = self.average_queue_length, + count = self.count_packet, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + }, + false => { + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; + } + } + } else { + self.count_packet = 0; + #[cfg(test)] + tracing::trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() + ); + } + } + + fn dequeue(&mut self) -> Option

{ + match self.queue.pop_front() { + Some(packet ) => { + self.now_bytes -= packet.l3_length() + self.get_extra_length(); + if self.is_empty() { + self.idle_start = Some(Instant::now()); + } + Some(packet) + }, + None => None + } + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[inline(always)] + fn get_extra_length(&self) -> usize { + self.config.bw_type.extra_length() + } + + fn get_front_size(&self) -> Option { + self.queue + .front() + .map(|packet| self.get_packet_size(packet)) + } + + fn length(&self) -> usize { + self.queue.len() + } + + fn retain(&mut self, mut f: F) + where + F: FnMut(&P) -> bool, + { + self.queue.retain(|packet| f(packet)); + } +} \ No newline at end of file From 4810deaa9e8e24867926640bfd6ae09fa9184542 Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Sat, 6 Jun 2026 19:20:00 +0800 Subject: [PATCH 02/12] add red unit tests --- rattan-core/src/cells/bandwidth/queue/red.rs | 139 +++++++++++++++++++ 1 file changed, 139 insertions(+) diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index 5dc2a9e..360dd83 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -267,4 +267,143 @@ where { self.queue.retain(|packet| f(packet)); } +} + + +#[cfg(test)] +mod tests { + use super::*; + use crate::cells::{Packet, StdPacket}; + use tokio::time::Instant; + + fn create_packet(size: usize) -> StdPacket { + let buf = vec![0u8; size]; + StdPacket::with_timestamp(&buf, Instant::now()) + } + + #[test_log::test] + fn test_red_queue_basic() { + let mut config = RedQueueConfig::default(); + config.min_th = 1000; + config.max_th = 2000; + let mut queue: RedQueue = RedQueue::new(config); + + assert!(queue.is_empty()); + + let pkt1 = create_packet(500); + queue.enqueue(pkt1); + assert!(!queue.is_empty()); + assert_eq!(queue.length(), 1); + + let dequeued = queue.dequeue(); + assert!(dequeued.is_some()); + assert!(queue.is_empty()); + } + + #[test_log::test] + fn test_red_queue_hard_limit_packet() { + let mut config = RedQueueConfig::default(); + config.packet_limit = Some(2); + config.min_th = 100000; // avoid red drop + config.max_th = 200000; + let mut queue: RedQueue = RedQueue::new(config); + + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + + // This one should be dropped due to packet limit + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + } + + #[test_log::test] + fn test_red_queue_hard_limit_byte() { + let mut config = RedQueueConfig::default(); + config.byte_limit = Some(150); + config.min_th = 100000; // avoid red drop + config.max_th = 200000; + let mut queue: RedQueue = RedQueue::new(config); + + queue.enqueue(create_packet(100)); // l3 length 86. + assert_eq!(queue.length(), 1); + + // This one should be dropped due to byte limit (86 + 86 > 150) + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + } + + #[test_log::test] + fn test_red_queue_max_th_drop() { + let mut config = RedQueueConfig::default(); + config.min_th = 100; + config.max_th = 200; + config.w_q = 1.0; // max weight, avg matches instantly + let mut queue: RedQueue = RedQueue::new(config); + + // First packet + queue.enqueue(create_packet(100)); + + // Second packet + queue.enqueue(create_packet(300)); + + // At this point, queue length is 2, now_bytes is high enough. + // The next enqueue should see average_queue_length > max_th and drop the packet. + let before_len = queue.length(); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), before_len, "Packet should be dropped by RED max_th"); + } + + #[test_log::test] + fn test_red_queue_min_th_no_drop() { + let mut config = RedQueueConfig::default(); + config.min_th = 1000; + config.max_th = 2000; + config.w_q = 1.0; // Instantly reach exact byte size + let mut queue: RedQueue = RedQueue::new(config); + + // First packet: queue empty, avg remains 0. + queue.enqueue(create_packet(514)); // L3 size = 514 - 14 (Ethernet header) = 500 + assert_eq!(queue.length(), 1); + + // Second packet: queue has 500 bytes. w_q=1.0 makes avg = 500. + // 500 < min_th(1000), so it should not drop. + queue.enqueue(create_packet(414)); // L3 size = 400 + assert_eq!(queue.length(), 2); + + // Check internal state: count_packet is -1 when avg < min_th + assert_eq!(queue.count_packet, -1); + } + + #[test_log::test] + fn test_red_queue_probabilistic_drop() { + let mut config = RedQueueConfig::default(); + config.min_th = 100; + config.max_th = 300; + config.max_p = 0.5; + config.w_q = 1.0; + let mut queue: RedQueue = RedQueue::new(config); + + // First packet: queue empty, avg = 0. L3 size = 200. + queue.enqueue(create_packet(214)); + assert_eq!(queue.length(), 1); + + let mut drop_count = 0; + let total_packets = 1000; + + for _ in 0..total_packets { + // enqueue packets with L3 size 0 (total size 14). + // now_bytes stays at 200. w_q=1.0 makes avg exactly 200. + // 100 (min_th) <= avg(200) < 300 (max_th), entering probabilistic drop zone. + let before = queue.length(); + queue.enqueue(create_packet(14)); + if queue.length() == before { + drop_count += 1; + } + } + + // It should drop some packets, but not all of them + assert!(drop_count > 0, "Should have dropped some packets probabilistically"); + assert!(drop_count < total_packets, "Should not drop all packets"); + } } \ No newline at end of file From 713d2f1d762f124feb5e18433dfdc49a848691db Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Sun, 7 Jun 2026 00:10:52 +0800 Subject: [PATCH 03/12] add ared & fix red --- rattan-core/src/cells/bandwidth/queue/ared.rs | 290 ++++++++++++++++++ rattan-core/src/cells/bandwidth/queue/mod.rs | 4 +- rattan-core/src/cells/bandwidth/queue/red.rs | 10 +- 3 files changed, 298 insertions(+), 6 deletions(-) create mode 100644 rattan-core/src/cells/bandwidth/queue/ared.rs diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs new file mode 100644 index 0000000..e12ed29 --- /dev/null +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -0,0 +1,290 @@ +// Adaptive RED Queue Implementation Reference: +// https://www.icir.org/floyd/papers/adaptiveRed.pdf#:~:text=We%20find%20that%20this%20re-vised%20version%20of%20Adaptive,length%20in%20a%20wide%20variety%20of%20traffic%20scenarios. + +use std::collections::VecDeque; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use tokio::time::{Instant, Duration}; +use rand::random_range; +use tracing::{debug, warn}; + +#[cfg(feature = "serde")] +use super::serde_default; +use super::{BwType, PacketQueue}; +use crate::cells::Packet; + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize), serde(default))] +#[derive(Debug, Clone)] +pub struct AdaptiveRedQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, + pub w_q: f64, // queue weight for calculating the average queue length + pub min_th: usize, // minimum threshold of average queue length + pub max_th: usize, // maximum threshold of average queue length + pub max_p: f64, // maximum probability of dropping a packet + #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub pkt_tx_time: Duration, // typical packet tx time (us) + #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub bw_type: BwType, +} + +impl Default for AdaptiveRedQueueConfig { + fn default() -> Self { + Self { + packet_limit: None, + byte_limit: None, + w_q: 0.002, + min_th: 7500, // 5 * 1500 bytes + max_th: 22500, // 15 * 1500 bytes + max_p: 0.02, + pkt_tx_time: Duration::from_micros(120), // 1500 bytes * 8 / 100Mbps = 120 us + bw_type: BwType::default(), + } + } +} + +impl AdaptiveRedQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + w_q: f64, + min_th: usize, + max_th: usize, + max_p: f64, + pkt_tx_time: Duration, + bw_type: BwType + ) -> Self { + // Warning: The caller must ensure that the parameters are valid. + // It's recommended to do validation before calling this function, + // or we may need to return a Result instead of Self in the future. + if min_th >= max_th { + warn!("AdaptiveRedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", min_th, max_th); + } + if pkt_tx_time.as_micros() == 0 { + warn!("AdaptiveRedQueueConfig: pkt_tx_time is 0, which will cause divide-by-zero in m calculation."); + } + if !(0.0..=1.0).contains(&w_q) { + warn!("AdaptiveRedQueueConfig: w_q ({}) is out of expected range [0.0, 1.0]. This is an EWMA weight.", w_q); + } + if !(0.0..=1.0).contains(&max_p) { + warn!("AdaptiveRedQueueConfig: max_p ({}) is out of expected range [0.0, 1.0]. This is a probability.", max_p); + } + + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + w_q, + min_th, + max_th, + max_p, + pkt_tx_time, + bw_type + } + } +} + +impl

From for AdaptiveRedQueue

{ + fn from(config: AdaptiveRedQueueConfig) -> Self { + AdaptiveRedQueue::new(config) + } +} + +#[derive(Debug)] +pub struct AdaptiveRedQueue

{ + queue: VecDeque

, + config: AdaptiveRedQueueConfig, + now_bytes: usize, // for calculating average_queue_length + average_queue_length: f64, + count_packet: i32, // number of packets since last dropping + idle_start: Option, // start time of current idle period + latest_max_p_update: Instant, // latest time when max_p is updates +} + +impl

AdaptiveRedQueue

{ + pub fn new(config: AdaptiveRedQueueConfig) -> Self { + debug!(?config, "New AdaptiveRedQueue"); + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + average_queue_length: 0.0, + count_packet: -1, + idle_start: None, + latest_max_p_update: Instant::now(), + } + } +} + +impl

Default for AdaptiveRedQueue

+where + P: Packet +{ + fn default() -> Self { + Self::new(AdaptiveRedQueueConfig::default()) + } +} + +impl

AdaptiveRedQueue

+where + P: Packet, +{ + fn update_avg (&mut self) { + match self.is_empty() { + false => { + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + self.config.w_q * (self.now_bytes as f64) + }, + true => { + if let Some(idle_start) = self.idle_start { + let now = Instant::now(); + let idle_duration = now.saturating_duration_since(idle_start); + let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); + self.idle_start = Some(now); + } + } + } + } + + fn should_drop (&mut self) -> bool { + let avg = self.average_queue_length; + let min_th = self.config.min_th as f64; + let max_th = self.config.max_th as f64; + if avg >= min_th && avg < max_th { + self.count_packet += 1; + let p_b = self.config.max_p * (avg - min_th) / (max_th - min_th); + let p_a = if self.count_packet as f64 * p_b >= 1.0 { + 1.0 + } else { + p_b / (1.0 - self.count_packet as f64 * p_b) + }; + + let rand_val = random_range(0.0 .. 1.0); + if rand_val < p_a { + self.count_packet = 0; + true + } else { + false + } + } else if avg >= max_th { + self.count_packet = 0; + true + } else { + self.count_packet = -1; + false + } + } + + fn update_max_p (&mut self) { + let target_min = self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; + let target_max = self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; + if self.average_queue_length > target_max && self.config.max_p <= 0.5 { + self.config.max_p += (self.config.max_p / 4.0).min(0.01); + } else if self.average_queue_length < target_min && self.config.max_p >= 0.01 { + self.config.max_p *= 0.9; + } + } +} + +impl

PacketQueue

for AdaptiveRedQueue

+where + P: Packet, +{ + type Config = AdaptiveRedQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn is_zero_buffer(&self) -> bool { + self.config.packet_limit.is_some_and(|limit| limit == 0) + || self.config.byte_limit.is_some_and(|limit| limit == 0) + } + + fn enqueue(&mut self, packet: P) { + self.update_avg(); + + let now = Instant::now(); + if now.saturating_duration_since(self.latest_max_p_update) >= Duration::from_millis(500) { + self.update_max_p(); + self.latest_max_p_update = now; + } + + if self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self.config.byte_limit.is_none_or(|limit| { + self.now_bytes + packet.l3_length() + self.config.bw_type.extra_length() <= limit + }) + { + let packet_size = packet.l3_length() + self.get_extra_length(); + + match self.should_drop() { + true => { + #[cfg(test)] + tracing::trace!( + avg = self.average_queue_length, + count = self.count_packet, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + }, + false => { + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; + } + } + } else { + self.count_packet = 0; + #[cfg(test)] + tracing::trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() + ); + } + } + + fn dequeue(&mut self) -> Option

{ + match self.queue.pop_front() { + Some(packet ) => { + self.now_bytes -= packet.l3_length() + self.get_extra_length(); + if self.is_empty() { + self.idle_start = Some(Instant::now()); + } + Some(packet) + }, + None => None + } + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[inline(always)] + fn get_extra_length(&self) -> usize { + self.config.bw_type.extra_length() + } + + fn get_front_size(&self) -> Option { + self.queue + .front() + .map(|packet| self.get_packet_size(packet)) + } + + fn length(&self) -> usize { + self.queue.len() + } + + fn retain(&mut self, mut f: F) + where + F: FnMut(&P) -> bool, + { + self.queue.retain(|packet| f(packet)); + } +} diff --git a/rattan-core/src/cells/bandwidth/queue/mod.rs b/rattan-core/src/cells/bandwidth/queue/mod.rs index b5cb00f..77973e6 100644 --- a/rattan-core/src/cells/bandwidth/queue/mod.rs +++ b/rattan-core/src/cells/bandwidth/queue/mod.rs @@ -16,7 +16,7 @@ mod drophead; mod droptail; mod infinite; mod red; -// Duan: mod ared; +mod ared; // Duan: mod pie; pub use codel::*; @@ -24,7 +24,7 @@ pub use drophead::*; pub use droptail::*; pub use infinite::*; pub use red::*; -// Duan: pub use ared::*; +pub use ared::*; // Duan: pub use pie::*; #[cfg(feature = "serde")] diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index 360dd83..0f9becb 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -1,4 +1,5 @@ // RED Queue Implementation Reference: +// https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=251892 // https://github.com/torvalds/linux/blob/master/include/net/red.h use std::collections::VecDeque; @@ -135,7 +136,7 @@ where true => { if let Some(idle_start) = self.idle_start { let now = Instant::now(); - let idle_duration = now.duration_since(idle_start); + let idle_duration = now.saturating_duration_since(idle_start); let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); self.idle_start = Some(now); @@ -190,6 +191,8 @@ where } fn enqueue(&mut self, packet: P) { + self.update_avg(); + if self .config .packet_limit @@ -199,7 +202,7 @@ where }) { let packet_size = packet.l3_length() + self.get_extra_length(); - self.update_avg(); + match self.should_drop() { true => { #[cfg(test)] @@ -273,8 +276,7 @@ where #[cfg(test)] mod tests { use super::*; - use crate::cells::{Packet, StdPacket}; - use tokio::time::Instant; + use crate::cells::StdPacket; fn create_packet(size: usize) -> StdPacket { let buf = vec![0u8; size]; From 18360e8546ab1c9f3824fbf23a7b774f6ba5df2b Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Sun, 7 Jun 2026 00:25:24 +0800 Subject: [PATCH 04/12] add ared unit tests --- rattan-core/src/cells/bandwidth/queue/ared.rs | 195 ++++++++++++++++++ 1 file changed, 195 insertions(+) diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs index e12ed29..22c45d1 100644 --- a/rattan-core/src/cells/bandwidth/queue/ared.rs +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -288,3 +288,198 @@ where self.queue.retain(|packet| f(packet)); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::cells::StdPacket; + + fn create_packet(size: usize) -> StdPacket { + let buf = vec![0u8; size]; + StdPacket::with_timestamp(&buf, Instant::now()) + } + + #[test_log::test] + fn test_ared_queue_basic() { + let mut config = AdaptiveRedQueueConfig::default(); + config.min_th = 1000; + config.max_th = 2000; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + assert!(queue.is_empty()); + + let pkt1 = create_packet(500); + queue.enqueue(pkt1); + assert!(!queue.is_empty()); + assert_eq!(queue.length(), 1); + + let dequeued = queue.dequeue(); + assert!(dequeued.is_some()); + assert!(queue.is_empty()); + } + + #[test_log::test] + fn test_ared_queue_hard_limit_packet() { + let mut config = AdaptiveRedQueueConfig::default(); + config.packet_limit = Some(2); + config.min_th = 100000; // avoid red drop + config.max_th = 200000; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + + // This one should be dropped due to packet limit + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + } + + #[test_log::test] + fn test_ared_queue_hard_limit_byte() { + let mut config = AdaptiveRedQueueConfig::default(); + config.byte_limit = Some(150); + config.min_th = 100000; // avoid red drop + config.max_th = 200000; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + queue.enqueue(create_packet(100)); // l3 length 86. + assert_eq!(queue.length(), 1); + + // This one should be dropped due to byte limit (86 + 86 > 150) + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + } + + #[test_log::test] + fn test_ared_queue_max_th_drop() { + let mut config = AdaptiveRedQueueConfig::default(); + config.min_th = 100; + config.max_th = 200; + config.w_q = 1.0; // max weight, avg matches instantly + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // First packet + queue.enqueue(create_packet(100)); + + // Second packet + queue.enqueue(create_packet(300)); + + // At this point, queue length is 2, now_bytes is high enough. + // The next enqueue should see average_queue_length > max_th and drop the packet. + let before_len = queue.length(); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), before_len, "Packet should be dropped by ARED max_th"); + } + + #[test_log::test] + fn test_ared_queue_min_th_no_drop() { + let mut config = AdaptiveRedQueueConfig::default(); + config.min_th = 1000; + config.max_th = 2000; + config.w_q = 1.0; // Instantly reach exact byte size + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // First packet: queue empty, avg remains 0. + queue.enqueue(create_packet(514)); // L3 size = 514 - 14 (Ethernet header) = 500 + assert_eq!(queue.length(), 1); + + // Second packet: queue has 500 bytes. w_q=1.0 makes avg = 500. + // 500 < min_th(1000), so it should not drop. + queue.enqueue(create_packet(414)); // L3 size = 400 + assert_eq!(queue.length(), 2); + + // Check internal state: count_packet is -1 when avg < min_th + assert_eq!(queue.count_packet, -1); + } + + #[test_log::test] + fn test_ared_queue_probabilistic_drop() { + let mut config = AdaptiveRedQueueConfig::default(); + config.min_th = 100; + config.max_th = 300; + config.max_p = 0.5; + config.w_q = 1.0; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // First packet: queue empty, avg = 0. L3 size = 200. + queue.enqueue(create_packet(214)); + assert_eq!(queue.length(), 1); + + let mut drop_count = 0; + let total_packets = 1000; + + for _ in 0..total_packets { + // enqueue packets with L3 size 0 (total size 14). + // now_bytes stays at 200. w_q=1.0 makes avg exactly 200. + // 100 (min_th) <= avg(200) < 300 (max_th), entering probabilistic drop zone. + let before = queue.length(); + queue.enqueue(create_packet(14)); + if queue.length() == before { + drop_count += 1; + } + } + + // It should drop some packets, but not all of them + assert!(drop_count > 0, "Should have dropped some packets probabilistically"); + assert!(drop_count < total_packets, "Should not drop all packets"); + } + + #[test_log::test] + fn test_ared_queue_max_p_increase() { + let mut config = AdaptiveRedQueueConfig::default(); + config.min_th = 100; + config.max_th = 200; + config.max_p = 0.02; + config.w_q = 1.0; // Instantly update avg + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // enqueue to make average_queue_length > target_max + // target_max = min_th + 0.6 * (max_th - min_th) = 100 + 60 = 160 + // We make avg = 180 (L3 size 180, total 194) + queue.enqueue(create_packet(194)); + assert_eq!(queue.average_queue_length, 0.0); // First enqueue updates avg based on empty queue rule (avg=0). + + // Second enqueue updates avg to 180 + queue.enqueue(create_packet(14)); + assert_eq!(queue.average_queue_length, 180.0); + + // Set latest_max_p_update to 600ms ago to trigger update_max_p + queue.latest_max_p_update = Instant::now() - Duration::from_millis(600); + let before_max_p = queue.config.max_p; + + // Third enqueue triggers update_max_p + queue.enqueue(create_packet(14)); + + let after_max_p = queue.config.max_p; + assert!(after_max_p > before_max_p, "max_p should increase when avg > target_max"); + } + + #[test_log::test] + fn test_ared_queue_max_p_decrease() { + let mut config = AdaptiveRedQueueConfig::default(); + config.min_th = 100; + config.max_th = 200; + config.max_p = 0.05; // Starting with a high max_p + config.w_q = 1.0; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // enqueue to make average_queue_length < target_min + // target_min = min_th + 0.4 * (max_th - min_th) = 100 + 40 = 140 + // We make avg = 120 (L3 size 120, total 134) + queue.enqueue(create_packet(134)); + assert_eq!(queue.average_queue_length, 0.0); + + queue.enqueue(create_packet(14)); + assert_eq!(queue.average_queue_length, 120.0); + + // Set latest_max_p_update to 600ms ago + queue.latest_max_p_update = Instant::now() - Duration::from_millis(600); + let before_max_p = queue.config.max_p; + + queue.enqueue(create_packet(14)); + + let after_max_p = queue.config.max_p; + assert!(after_max_p < before_max_p, "max_p should decrease when avg < target_min"); + } +} From 52df2e94aa2e186c2dccceb90b420a742dd168a5 Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Mon, 8 Jun 2026 15:46:24 +0800 Subject: [PATCH 05/12] add pie (without unit tests) --- rattan-core/src/cells/bandwidth/queue/mod.rs | 4 +- rattan-core/src/cells/bandwidth/queue/pie.rs | 330 +++++++++++++++++++ 2 files changed, 332 insertions(+), 2 deletions(-) create mode 100644 rattan-core/src/cells/bandwidth/queue/pie.rs diff --git a/rattan-core/src/cells/bandwidth/queue/mod.rs b/rattan-core/src/cells/bandwidth/queue/mod.rs index 77973e6..40c2065 100644 --- a/rattan-core/src/cells/bandwidth/queue/mod.rs +++ b/rattan-core/src/cells/bandwidth/queue/mod.rs @@ -17,7 +17,7 @@ mod droptail; mod infinite; mod red; mod ared; -// Duan: mod pie; +mod pie; pub use codel::*; pub use drophead::*; @@ -25,7 +25,7 @@ pub use droptail::*; pub use infinite::*; pub use red::*; pub use ared::*; -// Duan: pub use pie::*; +pub use pie::*; #[cfg(feature = "serde")] fn serde_default(t: &T) -> bool { diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs new file mode 100644 index 0000000..7e9e819 --- /dev/null +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -0,0 +1,330 @@ +// PIE Queue Implementation Reference: +// https://www.rfc-editor.org/info/rfc8033 +// https://ieeexplore.ieee.org/document/6602305 +// Reproduced according to RFC 8033 Appendix B, +// rather than original paper or RFC Appendix A. + +use std::collections::VecDeque; + +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use tokio::time::{Instant, Duration}; +use rand::random_range; +use tracing::debug; + +#[cfg(feature = "serde")] +use super::serde_default; +use super::{BwType, PacketQueue}; +use crate::cells::Packet; + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize), serde(default))] +#[derive(Debug, Clone)] +pub struct PieQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, + pub ref_del: f64, // target delay (sec) + pub t_update: Duration, // update interval + pub tilde_alpha: f64, // base value of alpha (Hz, 1/sec) + pub tilde_beta: f64, // base value of beta (Hz, 1/sec) + pub dq_threshold: usize, // threshold of queue length (bytes) + pub epsilon: f64, // EWMA weight + pub max_burst: f64, // MAX_BURST (ms) + #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub bw_type: BwType, +} + +impl Default for PieQueueConfig { + fn default() -> Self { + Self { + packet_limit: None, + byte_limit: None, + ref_del: 0.015, // RFC 8033 + t_update: Duration::from_millis(15), + tilde_alpha: 0.125, + tilde_beta: 1.25, + dq_threshold: 16384, // 16 KB + epsilon: 0.125, + max_burst: 150.0, + bw_type: BwType::default(), + } + } +} + +impl PieQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + ref_del: f64, + t_update: Duration, + tilde_alpha: f64, + tilde_beta: f64, + dq_threshold: usize, + epsilon: f64, + max_burst: f64, + bw_type: BwType + ) -> Self { + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + ref_del, + t_update, + tilde_alpha, + tilde_beta, + dq_threshold, + epsilon, + max_burst, + bw_type + } + } +} + +impl

From for PieQueue

{ + fn from(config: PieQueueConfig) -> Self { + PieQueue::new(config) + } +} + +#[derive(Debug)] +pub struct PieQueue

{ + queue: VecDeque

, + config: PieQueueConfig, + now_bytes: usize, + old_del: f64, // previous delay (sec) + p: f64, // current drop probability + dq_count: usize, // departure count (bytes) + start_update: Instant, // start time of t_update + start_measurement: Option, // Some(Instant) when in a measurement cycle, None when quit + avg_drate: f64, + burst_allowance: f64 +} + +impl

PieQueue

{ + pub fn new(config: PieQueueConfig) -> Self { + debug!(?config, "New PieQueue"); + let max_burst = config.max_burst; + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + old_del: 0.0, + p: 0.0, + dq_count: 0, + start_update: Instant::now(), + start_measurement: None, + avg_drate: 0.0, + burst_allowance: max_burst + } + } +} + +impl

Default for PieQueue

+where + P: Packet +{ + fn default() -> Self { + Self::new(PieQueueConfig::default()) + } +} + +impl

PieQueue

+where + P: Packet, +{ + fn update_drop_probability(&mut self) { + let now = Instant::now(); + let elapsed_ms = now.saturating_duration_since(self.start_update).as_secs_f64() * 1000.0; + + let cur_del = if self.avg_drate.abs() < f64::EPSILON { + 0.0 + } else { + self.now_bytes as f64 / self.avg_drate + }; + + let mut p_increment = self.config.tilde_alpha * (cur_del - self.config.ref_del) + + self.config.tilde_beta * (cur_del - self.old_del); + if self.p < 0.000001 { + p_increment /= 2048.0; + } else if self.p < 0.00001 { + p_increment /= 512.0; + } else if self.p < 0.0001 { + p_increment /= 128.0; + } else if self.p < 0.001 { + p_increment /= 32.0; + } else if self.p < 0.01 { + p_increment /= 8.0; + } else if self.p < 0.1 { + p_increment /= 2.0; + } + self.p += p_increment; + // RFC 8033 Section 4.2: Exponential decay when system is not congested + if cur_del < self.config.ref_del / 2.0 && self.old_del < self.config.ref_del / 2.0 { + self.p *= 0.98; + } + self.p = self.p.clamp(0.0, 1.0); + + // RFC 8033 Section 4.4: Burst Tolerance + if self.p < f64::EPSILON && cur_del < self.config.ref_del / 2.0 && self.old_del < self.config.ref_del / 2.0 { + self.burst_allowance = self.config.max_burst; + } else { + self.burst_allowance = (self.burst_allowance - elapsed_ms).max(0.0); + } + + self.old_del = cur_del; + self.start_update = now; + } + + fn should_drop(&self) -> bool { + // RFC 8033 Section 4.4: Enqueue packet bypassing random drop if burst_allow > 0 + if self.burst_allowance > f64::EPSILON { + return false; + } + + // RFC 8033 Section 4.1: Bypass random drop logic to be work conserving + let bypass_drop = (self.old_del < self.config.ref_del / 2.0 && self.p < 0.2) + || self.queue.len() <= 2; + if bypass_drop { + return false; + } + + let rand_val = random_range(0.0 .. 1.0); + rand_val < self.p + } + + fn update_avg_drate(&mut self, pkt_size: usize) { + let now = Instant::now(); + + // Enter a measurement cycle + if self.now_bytes > self.config.dq_threshold && self.start_measurement.is_none() { + self.start_measurement = Some(now); + self.dq_count = 0; + } + + // Update departure rate if we are in a measurement cycle + if let Some(start) = self.start_measurement { + self.dq_count += pkt_size; + if self.dq_count > self.config.dq_threshold { + let dq_int = now.saturating_duration_since(start).as_secs_f64(); + if dq_int > f64::EPSILON { + let dq_rate = self.dq_count as f64 / dq_int; + if self.avg_drate.abs() < f64::EPSILON { + self.avg_drate = dq_rate; + } else { + // Duan: epsilon 参数可能不能自己设 + self.avg_drate = (1.0 - self.config.epsilon) * self.avg_drate + self.config.epsilon * dq_rate; + } + self.start_measurement = Some(now); + self.dq_count = 0; + } + } + + // Exit measurement cycle if queue length drops below threshold + if self.now_bytes < self.config.dq_threshold { + self.start_measurement = None; + self.dq_count = 0; + } + } + } +} + +impl

PacketQueue

for PieQueue

+where + P: Packet, +{ + type Config = PieQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn is_zero_buffer(&self) -> bool { + self.config.packet_limit.is_some_and(|limit| limit == 0) + || self.config.byte_limit.is_some_and(|limit| limit == 0) + } + + fn enqueue(&mut self, packet: P) { + // Simulate time-driven with event-driven approach + let interval_update = Instant::now().saturating_duration_since(self.start_update); + if interval_update >= self.config.t_update { + self.update_drop_probability(); + } + + // hard limit check + if self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self.config.byte_limit.is_none_or(|limit| { + self.now_bytes + packet.l3_length() + self.config.bw_type.extra_length() <= limit + }) + { + match self.should_drop() { + true => { + #[cfg(test)] + tracing::trace!( + p = self.p, + old_delay = self.old_del, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + }, + false => { + self.now_bytes += packet.l3_length() + self.get_extra_length(); + self.queue.push_back(packet); + } + } + } else { + #[cfg(test)] + tracing::trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() + ); + } + } + + fn dequeue(&mut self) -> Option

{ + // Simulate time-driven with event-driven approach + let interval_update = Instant::now().saturating_duration_since(self.start_update); + if interval_update >= self.config.t_update { + self.update_drop_probability(); + } + + match self.queue.pop_front() { + Some(packet) => { + let pkt_size = packet.l3_length() + self.get_extra_length(); + self.now_bytes -= pkt_size; + self.update_avg_drate(pkt_size); + Some(packet) + }, + None => None, + } + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[inline(always)] + fn get_extra_length(&self) -> usize { + self.config.bw_type.extra_length() + } + + fn get_front_size(&self) -> Option { + self.queue + .front() + .map(|packet| self.get_packet_size(packet)) + } + + fn length(&self) -> usize { + self.queue.len() + } + + fn retain(&mut self, mut f: F) + where + F: FnMut(&P) -> bool, + { + self.queue.retain(|packet| f(packet)); + } +} \ No newline at end of file From 1109872461d71d2370a9ada7fbf992707eac4a81 Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Mon, 8 Jun 2026 18:13:56 +0800 Subject: [PATCH 06/12] update pie --- rattan-core/src/cells/bandwidth/queue/pie.rs | 154 ++++++++++++++++++- 1 file changed, 151 insertions(+), 3 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs index 7e9e819..d3d9abb 100644 --- a/rattan-core/src/cells/bandwidth/queue/pie.rs +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -168,7 +168,6 @@ where } else { self.burst_allowance = (self.burst_allowance - elapsed_ms).max(0.0); } - self.old_del = cur_del; self.start_update = now; } @@ -202,14 +201,13 @@ where // Update departure rate if we are in a measurement cycle if let Some(start) = self.start_measurement { self.dq_count += pkt_size; - if self.dq_count > self.config.dq_threshold { + if self.dq_count >= self.config.dq_threshold { let dq_int = now.saturating_duration_since(start).as_secs_f64(); if dq_int > f64::EPSILON { let dq_rate = self.dq_count as f64 / dq_int; if self.avg_drate.abs() < f64::EPSILON { self.avg_drate = dq_rate; } else { - // Duan: epsilon 参数可能不能自己设 self.avg_drate = (1.0 - self.config.epsilon) * self.avg_drate + self.config.epsilon * dq_rate; } self.start_measurement = Some(now); @@ -327,4 +325,154 @@ where { self.queue.retain(|packet| f(packet)); } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cells::StdPacket; + + fn create_packet(size: usize) -> StdPacket { + let buf = vec![0u8; size]; + StdPacket::with_timestamp(&buf, Instant::now()) + } + + #[test_log::test] + fn test_pie_queue_basic() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config); + + assert!(queue.is_empty()); + + let pkt1 = create_packet(500); + queue.enqueue(pkt1); + assert!(!queue.is_empty()); + assert_eq!(queue.length(), 1); + + let dequeued = queue.dequeue(); + assert!(dequeued.is_some()); + assert!(queue.is_empty()); + } + + #[test_log::test] + fn test_pie_queue_hard_limit_packet() { + let mut config = PieQueueConfig::default(); + config.packet_limit = Some(2); + let mut queue: PieQueue = PieQueue::new(config); + + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + + // This one should be dropped due to packet limit + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + } + + #[test_log::test] + fn test_pie_queue_hard_limit_byte() { + let mut config = PieQueueConfig::default(); + config.byte_limit = Some(150); + let mut queue: PieQueue = PieQueue::new(config); + + queue.enqueue(create_packet(100)); // l3 length 86. + assert_eq!(queue.length(), 1); + + // This one should be dropped due to byte limit (86 + 86 > 150) + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + } + + #[test_log::test] + fn test_pie_queue_burst_allowance() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config); + + // Force a high drop probability + queue.p = 1.0; + queue.burst_allowance = 100.0; + + // burst_allowance > 0 bypasses random drop + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + + // Fill queue > 2 to bypass work conserving logic later + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + + queue.burst_allowance = 0.0; + // With burst_allowance = 0.0, queue > 2, and p = 1.0, it should drop + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + } + + #[test_log::test] + fn test_pie_queue_work_conserving() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config.clone()); + + queue.p = 1.0; + queue.burst_allowance = 0.0; + + // bypass_drop handles queue.len() <= 2 + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + + // For the 4th element, queue.len() is 3, so it does not bypass based on length + // Since p = 1.0, it drops + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + + // Now test bypass_drop condition: old_del < ref_del/2 and p < 0.2 + queue.p = 0.15; + queue.old_del = config.ref_del / 3.0; + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 4); + } + + #[test_log::test] + fn test_pie_queue_avg_drate_update() { + let mut config = PieQueueConfig::default(); + config.dq_threshold = 50; // Small threshold + let mut queue: PieQueue = PieQueue::new(config); + + queue.enqueue(create_packet(114)); // l3 length 100 + queue.enqueue(create_packet(114)); // l3 length 100 + assert_eq!(queue.now_bytes, 200); + + // First dequeue triggers start of measurement cycle + assert!(queue.start_measurement.is_none()); + queue.dequeue(); // dequeues 100 bytes + assert!(queue.start_measurement.is_some()); + assert_eq!(queue.now_bytes, 100); + + std::thread::sleep(Duration::from_millis(10)); + + // Second dequeue triggers calculation of avg_drate + queue.dequeue(); // dequeues 100 bytes + assert!(queue.avg_drate > 0.0, "avg_drate should be calculated"); + assert!(queue.start_measurement.is_none(), "Should exit measurement cycle since queue is empty"); + } + + #[test_log::test] + fn test_pie_queue_update_drop_probability() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config.clone()); + + // Fake high delay + queue.avg_drate = 1000.0; + queue.now_bytes = 100000; // delay = 100.0s > ref_del + + std::thread::sleep(config.t_update); // Wait to exceed t_update + + // This enqueue will trigger update_drop_probability() + queue.enqueue(create_packet(14)); + + assert!(queue.p > 0.0, "Probability should increase when delay is high"); + } } \ No newline at end of file From 608ff1bdd1d32204a60200c99d69f56e421c5341 Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Mon, 8 Jun 2026 19:28:23 +0800 Subject: [PATCH 07/12] update pie unit tests --- rattan-core/src/cells/bandwidth/queue/pie.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs index d3d9abb..0c1d01c 100644 --- a/rattan-core/src/cells/bandwidth/queue/pie.rs +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -341,7 +341,6 @@ mod tests { fn test_pie_queue_basic() { let config = PieQueueConfig::default(); let mut queue: PieQueue = PieQueue::new(config); - assert!(queue.is_empty()); let pkt1 = create_packet(500); @@ -411,7 +410,6 @@ mod tests { fn test_pie_queue_work_conserving() { let config = PieQueueConfig::default(); let mut queue: PieQueue = PieQueue::new(config.clone()); - queue.p = 1.0; queue.burst_allowance = 0.0; From 94d045074777bfbd7fe5b41a85b20c310cf29450 Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Tue, 9 Jun 2026 19:00:10 +0800 Subject: [PATCH 08/12] fix un-lock-able's review on ared --- rattan-core/src/cells/bandwidth/queue/ared.rs | 97 ++++++++----------- rattan-core/src/cells/bandwidth/queue/pie.rs | 63 ++++++------ rattan-core/src/cells/bandwidth/queue/red.rs | 66 ++++++------- 3 files changed, 100 insertions(+), 126 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs index 22c45d1..fd3b794 100644 --- a/rattan-core/src/cells/bandwidth/queue/ared.rs +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -1,6 +1,5 @@ // Adaptive RED Queue Implementation Reference: -// https://www.icir.org/floyd/papers/adaptiveRed.pdf#:~:text=We%20find%20that%20this%20re-vised%20version%20of%20Adaptive,length%20in%20a%20wide%20variety%20of%20traffic%20scenarios. - +// https://www.icir.org/floyd/papers/adaptiveRed.pdf use std::collections::VecDeque; #[cfg(feature = "serde")] @@ -129,20 +128,18 @@ impl

AdaptiveRedQueue

where P: Packet, { - fn update_avg (&mut self) { - match self.is_empty() { - false => { - self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + self.config.w_q * (self.now_bytes as f64) - }, - true => { - if let Some(idle_start) = self.idle_start { - let now = Instant::now(); - let idle_duration = now.saturating_duration_since(idle_start); - let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; - self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); - self.idle_start = Some(now); - } - } + fn update_avg(&mut self) { + if !self.is_empty() { + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + self.config.w_q * (self.now_bytes as f64); + return; + } + + if let Some(idle_start) = self.idle_start { + let now = Instant::now(); + let idle_duration = now.saturating_duration_since(idle_start); + let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); + self.idle_start = Some(now); } } @@ -210,55 +207,47 @@ where self.latest_max_p_update = now; } - if self - .config - .packet_limit - .is_none_or(|limit| self.queue.len() < limit) - && self.config.byte_limit.is_none_or(|limit| { - self.now_bytes + packet.l3_length() + self.config.bw_type.extra_length() <= limit - }) - { - let packet_size = packet.l3_length() + self.get_extra_length(); - - match self.should_drop() { - true => { - #[cfg(test)] - tracing::trace!( - avg = self.average_queue_length, - count = self.count_packet, - header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), - "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() - ); - return; - }, - false => { - self.now_bytes += packet_size; - self.queue.push_back(packet); - self.idle_start = None; - } - } - } else { + let packet_size = packet.l3_length() + self.get_extra_length(); + let pass_hard_limit = self.config.packet_limit.is_none_or(|limit| self.queue.len() < limit) + && self.config.byte_limit.is_none_or(|limit| self.now_bytes + packet_size <= limit); + + if !pass_hard_limit { self.count_packet = 0; #[cfg(test)] tracing::trace!( queue_len = self.queue.len(), now_bytes = self.now_bytes, header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), - "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() + ); + return; + } + + if self.should_drop() { + #[cfg(test)] + tracing::trace!( + avg = self.average_queue_length, + count = self.count_packet, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() ); + return; } + + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; } fn dequeue(&mut self) -> Option

{ - match self.queue.pop_front() { - Some(packet ) => { - self.now_bytes -= packet.l3_length() + self.get_extra_length(); - if self.is_empty() { - self.idle_start = Some(Instant::now()); - } - Some(packet) - }, - None => None + if let Some(packet) = self.queue.pop_front() { + self.now_bytes -= packet.l3_length() + self.get_extra_length(); + if self.is_empty() { + self.idle_start = Some(Instant::now()); + } + Some(packet) + } else { + None } } diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs index 0c1d01c..22d0e35 100644 --- a/rattan-core/src/cells/bandwidth/queue/pie.rs +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -246,40 +246,34 @@ where self.update_drop_probability(); } - // hard limit check - if self - .config - .packet_limit - .is_none_or(|limit| self.queue.len() < limit) - && self.config.byte_limit.is_none_or(|limit| { - self.now_bytes + packet.l3_length() + self.config.bw_type.extra_length() <= limit - }) - { - match self.should_drop() { - true => { - #[cfg(test)] - tracing::trace!( - p = self.p, - old_delay = self.old_del, - header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), - "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() - ); - return; - }, - false => { - self.now_bytes += packet.l3_length() + self.get_extra_length(); - self.queue.push_back(packet); - } - } - } else { + let packet_size = packet.l3_length() + self.get_extra_length(); + let pass_hard_limit = self.config.packet_limit.is_none_or(|limit| self.queue.len() < limit) + && self.config.byte_limit.is_none_or(|limit| self.now_bytes + packet_size <= limit); + + if !pass_hard_limit { #[cfg(test)] tracing::trace!( queue_len = self.queue.len(), now_bytes = self.now_bytes, header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), - "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() + ); + return; + } + + if self.should_drop() { + #[cfg(test)] + tracing::trace!( + p = self.p, + old_delay = self.old_del, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() ); + return; } + + self.now_bytes += packet_size; + self.queue.push_back(packet); } fn dequeue(&mut self) -> Option

{ @@ -289,14 +283,13 @@ where self.update_drop_probability(); } - match self.queue.pop_front() { - Some(packet) => { - let pkt_size = packet.l3_length() + self.get_extra_length(); - self.now_bytes -= pkt_size; - self.update_avg_drate(pkt_size); - Some(packet) - }, - None => None, + if let Some(packet) = self.queue.pop_front() { + let pkt_size = packet.l3_length() + self.get_extra_length(); + self.now_bytes -= pkt_size; + self.update_avg_drate(pkt_size); + Some(packet) + } else { + None } } diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index 0f9becb..c5389aa 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -193,34 +193,11 @@ where fn enqueue(&mut self, packet: P) { self.update_avg(); - if self - .config - .packet_limit - .is_none_or(|limit| self.queue.len() < limit) - && self.config.byte_limit.is_none_or(|limit| { - self.now_bytes + packet.l3_length() + self.config.bw_type.extra_length() <= limit - }) - { - let packet_size = packet.l3_length() + self.get_extra_length(); - - match self.should_drop() { - true => { - #[cfg(test)] - tracing::trace!( - avg = self.average_queue_length, - count = self.count_packet, - header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), - "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() - ); - return; - }, - false => { - self.now_bytes += packet_size; - self.queue.push_back(packet); - self.idle_start = None; - } - } - } else { + let packet_size = packet.l3_length() + self.get_extra_length(); + let pass_hard_limit = self.config.packet_limit.is_none_or(|limit| self.queue.len() < limit) + && self.config.byte_limit.is_none_or(|limit| self.now_bytes + packet_size <= limit); + + if !pass_hard_limit { self.count_packet = 0; #[cfg(test)] tracing::trace!( @@ -229,19 +206,34 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() ); + return; } + + if self.should_drop() { + #[cfg(test)] + tracing::trace!( + avg = self.average_queue_length, + count = self.count_packet, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + } + + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; } fn dequeue(&mut self) -> Option

{ - match self.queue.pop_front() { - Some(packet ) => { - self.now_bytes -= packet.l3_length() + self.get_extra_length(); - if self.is_empty() { - self.idle_start = Some(Instant::now()); - } - Some(packet) - }, - None => None + if let Some(packet) = self.queue.pop_front() { + self.now_bytes -= packet.l3_length() + self.get_extra_length(); + if self.is_empty() { + self.idle_start = Some(Instant::now()); + } + Some(packet) + } else { + None } } From d86554a6b1e5150aa488949cb9a297a8d1cd948d Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Tue, 9 Jun 2026 19:22:17 +0800 Subject: [PATCH 09/12] fix warnings from cargo clippy (in github actions) and run cargo fmt --- rattan-core/src/cells/bandwidth/queue/ared.rs | 184 +++++++++++------- rattan-core/src/cells/bandwidth/queue/mod.rs | 8 +- rattan-core/src/cells/bandwidth/queue/pie.rs | 129 +++++++----- rattan-core/src/cells/bandwidth/queue/red.rs | 157 +++++++++------ 4 files changed, 296 insertions(+), 182 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs index fd3b794..6be1a5f 100644 --- a/rattan-core/src/cells/bandwidth/queue/ared.rs +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -2,10 +2,10 @@ // https://www.icir.org/floyd/papers/adaptiveRed.pdf use std::collections::VecDeque; +use rand::random_range; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use tokio::time::{Instant, Duration}; -use rand::random_range; +use tokio::time::{Duration, Instant}; use tracing::{debug, warn}; #[cfg(feature = "serde")] @@ -18,13 +18,19 @@ use crate::cells::Packet; pub struct AdaptiveRedQueueConfig { pub packet_limit: Option, pub byte_limit: Option, - pub w_q: f64, // queue weight for calculating the average queue length + pub w_q: f64, // queue weight for calculating the average queue length pub min_th: usize, // minimum threshold of average queue length pub max_th: usize, // maximum threshold of average queue length - pub max_p: f64, // maximum probability of dropping a packet - #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub max_p: f64, // maximum probability of dropping a packet + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] pub pkt_tx_time: Duration, // typical packet tx time (us) - #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] pub bw_type: BwType, } @@ -34,7 +40,7 @@ impl Default for AdaptiveRedQueueConfig { packet_limit: None, byte_limit: None, w_q: 0.002, - min_th: 7500, // 5 * 1500 bytes + min_th: 7500, // 5 * 1500 bytes max_th: 22500, // 15 * 1500 bytes max_p: 0.02, pkt_tx_time: Duration::from_micros(120), // 1500 bytes * 8 / 100Mbps = 120 us @@ -44,6 +50,7 @@ impl Default for AdaptiveRedQueueConfig { } impl AdaptiveRedQueueConfig { + #[allow(clippy::too_many_arguments)] pub fn new>, B: Into>>( packet_limit: A, byte_limit: B, @@ -52,7 +59,7 @@ impl AdaptiveRedQueueConfig { max_th: usize, max_p: f64, pkt_tx_time: Duration, - bw_type: BwType + bw_type: BwType, ) -> Self { // Warning: The caller must ensure that the parameters are valid. // It's recommended to do validation before calling this function, @@ -69,7 +76,7 @@ impl AdaptiveRedQueueConfig { if !(0.0..=1.0).contains(&max_p) { warn!("AdaptiveRedQueueConfig: max_p ({}) is out of expected range [0.0, 1.0]. This is a probability.", max_p); } - + Self { packet_limit: packet_limit.into(), byte_limit: byte_limit.into(), @@ -78,7 +85,7 @@ impl AdaptiveRedQueueConfig { max_th, max_p, pkt_tx_time, - bw_type + bw_type, } } } @@ -95,8 +102,8 @@ pub struct AdaptiveRedQueue

{ config: AdaptiveRedQueueConfig, now_bytes: usize, // for calculating average_queue_length average_queue_length: f64, - count_packet: i32, // number of packets since last dropping - idle_start: Option, // start time of current idle period + count_packet: i32, // number of packets since last dropping + idle_start: Option, // start time of current idle period latest_max_p_update: Instant, // latest time when max_p is updates } @@ -117,7 +124,7 @@ impl

AdaptiveRedQueue

{ impl

Default for AdaptiveRedQueue

where - P: Packet + P: Packet, { fn default() -> Self { Self::new(AdaptiveRedQueueConfig::default()) @@ -126,11 +133,12 @@ where impl

AdaptiveRedQueue

where - P: Packet, + P: Packet, { fn update_avg(&mut self) { if !self.is_empty() { - self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + self.config.w_q * (self.now_bytes as f64); + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + + self.config.w_q * (self.now_bytes as f64); return; } @@ -143,7 +151,7 @@ where } } - fn should_drop (&mut self) -> bool { + fn should_drop(&mut self) -> bool { let avg = self.average_queue_length; let min_th = self.config.min_th as f64; let max_th = self.config.max_th as f64; @@ -156,7 +164,7 @@ where p_b / (1.0 - self.count_packet as f64 * p_b) }; - let rand_val = random_range(0.0 .. 1.0); + let rand_val = random_range(0.0..1.0); if rand_val < p_a { self.count_packet = 0; true @@ -172,9 +180,11 @@ where } } - fn update_max_p (&mut self) { - let target_min = self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; - let target_max = self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; + fn update_max_p(&mut self) { + let target_min = + self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; + let target_max = + self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; if self.average_queue_length > target_max && self.config.max_p <= 0.5 { self.config.max_p += (self.config.max_p / 4.0).min(0.01); } else if self.average_queue_length < target_min && self.config.max_p >= 0.01 { @@ -208,8 +218,14 @@ where } let packet_size = packet.l3_length() + self.get_extra_length(); - let pass_hard_limit = self.config.packet_limit.is_none_or(|limit| self.queue.len() < limit) - && self.config.byte_limit.is_none_or(|limit| self.now_bytes + packet_size <= limit); + let pass_hard_limit = self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self + .config + .byte_limit + .is_none_or(|limit| self.now_bytes + packet_size <= limit); if !pass_hard_limit { self.count_packet = 0; @@ -231,6 +247,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() ); + #[allow(clippy::needless_return)] return; } @@ -290,9 +307,11 @@ mod tests { #[test_log::test] fn test_ared_queue_basic() { - let mut config = AdaptiveRedQueueConfig::default(); - config.min_th = 1000; - config.max_th = 2000; + let config = AdaptiveRedQueueConfig { + min_th: 1000, + max_th: 2000, + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); assert!(queue.is_empty()); @@ -309,10 +328,12 @@ mod tests { #[test_log::test] fn test_ared_queue_hard_limit_packet() { - let mut config = AdaptiveRedQueueConfig::default(); - config.packet_limit = Some(2); - config.min_th = 100000; // avoid red drop - config.max_th = 200000; + let config = AdaptiveRedQueueConfig { + packet_limit: Some(2), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); queue.enqueue(create_packet(100)); @@ -326,10 +347,12 @@ mod tests { #[test_log::test] fn test_ared_queue_hard_limit_byte() { - let mut config = AdaptiveRedQueueConfig::default(); - config.byte_limit = Some(150); - config.min_th = 100000; // avoid red drop - config.max_th = 200000; + let config = AdaptiveRedQueueConfig { + byte_limit: Some(150), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); queue.enqueue(create_packet(100)); // l3 length 86. @@ -342,34 +365,42 @@ mod tests { #[test_log::test] fn test_ared_queue_max_th_drop() { - let mut config = AdaptiveRedQueueConfig::default(); - config.min_th = 100; - config.max_th = 200; - config.w_q = 1.0; // max weight, avg matches instantly + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 200, + w_q: 1.0, // max weight, avg matches instantly + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); // First packet - queue.enqueue(create_packet(100)); - + queue.enqueue(create_packet(100)); + // Second packet queue.enqueue(create_packet(300)); - + // At this point, queue length is 2, now_bytes is high enough. // The next enqueue should see average_queue_length > max_th and drop the packet. let before_len = queue.length(); queue.enqueue(create_packet(100)); - assert_eq!(queue.length(), before_len, "Packet should be dropped by ARED max_th"); + assert_eq!( + queue.length(), + before_len, + "Packet should be dropped by ARED max_th" + ); } #[test_log::test] fn test_ared_queue_min_th_no_drop() { - let mut config = AdaptiveRedQueueConfig::default(); - config.min_th = 1000; - config.max_th = 2000; - config.w_q = 1.0; // Instantly reach exact byte size + let config = AdaptiveRedQueueConfig { + min_th: 1000, + max_th: 2000, + w_q: 1.0, // Instantly reach exact byte size + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); - // First packet: queue empty, avg remains 0. + // First packet: queue empty, avg remains 0. queue.enqueue(create_packet(514)); // L3 size = 514 - 14 (Ethernet header) = 500 assert_eq!(queue.length(), 1); @@ -377,50 +408,57 @@ mod tests { // 500 < min_th(1000), so it should not drop. queue.enqueue(create_packet(414)); // L3 size = 400 assert_eq!(queue.length(), 2); - + // Check internal state: count_packet is -1 when avg < min_th assert_eq!(queue.count_packet, -1); } #[test_log::test] fn test_ared_queue_probabilistic_drop() { - let mut config = AdaptiveRedQueueConfig::default(); - config.min_th = 100; - config.max_th = 300; - config.max_p = 0.5; - config.w_q = 1.0; + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 300, + max_p: 0.5, + w_q: 1.0, + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); // First packet: queue empty, avg = 0. L3 size = 200. - queue.enqueue(create_packet(214)); + queue.enqueue(create_packet(214)); assert_eq!(queue.length(), 1); let mut drop_count = 0; let total_packets = 1000; - + for _ in 0..total_packets { // enqueue packets with L3 size 0 (total size 14). // now_bytes stays at 200. w_q=1.0 makes avg exactly 200. // 100 (min_th) <= avg(200) < 300 (max_th), entering probabilistic drop zone. let before = queue.length(); - queue.enqueue(create_packet(14)); + queue.enqueue(create_packet(14)); if queue.length() == before { drop_count += 1; } } - + // It should drop some packets, but not all of them - assert!(drop_count > 0, "Should have dropped some packets probabilistically"); + assert!( + drop_count > 0, + "Should have dropped some packets probabilistically" + ); assert!(drop_count < total_packets, "Should not drop all packets"); } #[test_log::test] fn test_ared_queue_max_p_increase() { - let mut config = AdaptiveRedQueueConfig::default(); - config.min_th = 100; - config.max_th = 200; - config.max_p = 0.02; - config.w_q = 1.0; // Instantly update avg + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 200, + max_p: 0.02, + w_q: 1.0, // Instantly update avg + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); // enqueue to make average_queue_length > target_max @@ -441,16 +479,21 @@ mod tests { queue.enqueue(create_packet(14)); let after_max_p = queue.config.max_p; - assert!(after_max_p > before_max_p, "max_p should increase when avg > target_max"); + assert!( + after_max_p > before_max_p, + "max_p should increase when avg > target_max" + ); } #[test_log::test] fn test_ared_queue_max_p_decrease() { - let mut config = AdaptiveRedQueueConfig::default(); - config.min_th = 100; - config.max_th = 200; - config.max_p = 0.05; // Starting with a high max_p - config.w_q = 1.0; + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 200, + max_p: 0.05, // Starting with a high max_p + w_q: 1.0, + ..Default::default() + }; let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); // enqueue to make average_queue_length < target_min @@ -469,6 +512,9 @@ mod tests { queue.enqueue(create_packet(14)); let after_max_p = queue.config.max_p; - assert!(after_max_p < before_max_p, "max_p should decrease when avg < target_min"); + assert!( + after_max_p < before_max_p, + "max_p should decrease when avg < target_min" + ); } } diff --git a/rattan-core/src/cells/bandwidth/queue/mod.rs b/rattan-core/src/cells/bandwidth/queue/mod.rs index 40c2065..f4c9c22 100644 --- a/rattan-core/src/cells/bandwidth/queue/mod.rs +++ b/rattan-core/src/cells/bandwidth/queue/mod.rs @@ -11,21 +11,21 @@ use tokio::time::Instant; use super::BwType; use crate::cells::{Packet, LARGE_DURATION}; +mod ared; mod codel; mod drophead; mod droptail; mod infinite; -mod red; -mod ared; mod pie; +mod red; +pub use ared::*; pub use codel::*; pub use drophead::*; pub use droptail::*; pub use infinite::*; -pub use red::*; -pub use ared::*; pub use pie::*; +pub use red::*; #[cfg(feature = "serde")] fn serde_default(t: &T) -> bool { diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs index 22d0e35..50318dc 100644 --- a/rattan-core/src/cells/bandwidth/queue/pie.rs +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -1,15 +1,15 @@ // PIE Queue Implementation Reference: // https://www.rfc-editor.org/info/rfc8033 // https://ieeexplore.ieee.org/document/6602305 -// Reproduced according to RFC 8033 Appendix B, +// Reproduced according to RFC 8033 Appendix B, // rather than original paper or RFC Appendix A. use std::collections::VecDeque; +use rand::random_range; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use tokio::time::{Instant, Duration}; -use rand::random_range; +use tokio::time::{Duration, Instant}; use tracing::debug; #[cfg(feature = "serde")] @@ -22,14 +22,17 @@ use crate::cells::Packet; pub struct PieQueueConfig { pub packet_limit: Option, pub byte_limit: Option, - pub ref_del: f64, // target delay (sec) - pub t_update: Duration, // update interval - pub tilde_alpha: f64, // base value of alpha (Hz, 1/sec) - pub tilde_beta: f64, // base value of beta (Hz, 1/sec) + pub ref_del: f64, // target delay (sec) + pub t_update: Duration, // update interval + pub tilde_alpha: f64, // base value of alpha (Hz, 1/sec) + pub tilde_beta: f64, // base value of beta (Hz, 1/sec) pub dq_threshold: usize, // threshold of queue length (bytes) - pub epsilon: f64, // EWMA weight - pub max_burst: f64, // MAX_BURST (ms) - #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub epsilon: f64, // EWMA weight + pub max_burst: f64, // MAX_BURST (ms) + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] pub bw_type: BwType, } @@ -51,6 +54,7 @@ impl Default for PieQueueConfig { } impl PieQueueConfig { + #[allow(clippy::too_many_arguments)] pub fn new>, B: Into>>( packet_limit: A, byte_limit: B, @@ -61,7 +65,7 @@ impl PieQueueConfig { dq_threshold: usize, epsilon: f64, max_burst: f64, - bw_type: BwType + bw_type: BwType, ) -> Self { Self { packet_limit: packet_limit.into(), @@ -73,7 +77,7 @@ impl PieQueueConfig { dq_threshold, epsilon, max_burst, - bw_type + bw_type, } } } @@ -89,13 +93,13 @@ pub struct PieQueue

{ queue: VecDeque

, config: PieQueueConfig, now_bytes: usize, - old_del: f64, // previous delay (sec) - p: f64, // current drop probability - dq_count: usize, // departure count (bytes) - start_update: Instant, // start time of t_update + old_del: f64, // previous delay (sec) + p: f64, // current drop probability + dq_count: usize, // departure count (bytes) + start_update: Instant, // start time of t_update start_measurement: Option, // Some(Instant) when in a measurement cycle, None when quit - avg_drate: f64, - burst_allowance: f64 + avg_drate: f64, + burst_allowance: f64, } impl

PieQueue

{ @@ -112,14 +116,14 @@ impl

PieQueue

{ start_update: Instant::now(), start_measurement: None, avg_drate: 0.0, - burst_allowance: max_burst + burst_allowance: max_burst, } } } impl

Default for PieQueue

-where - P: Packet +where + P: Packet, { fn default() -> Self { Self::new(PieQueueConfig::default()) @@ -132,8 +136,11 @@ where { fn update_drop_probability(&mut self) { let now = Instant::now(); - let elapsed_ms = now.saturating_duration_since(self.start_update).as_secs_f64() * 1000.0; - + let elapsed_ms = now + .saturating_duration_since(self.start_update) + .as_secs_f64() + * 1000.0; + let cur_del = if self.avg_drate.abs() < f64::EPSILON { 0.0 } else { @@ -156,14 +163,17 @@ where p_increment /= 2.0; } self.p += p_increment; - // RFC 8033 Section 4.2: Exponential decay when system is not congested + // RFC 8033 Section 4.2: Exponential decay when system is not congested if cur_del < self.config.ref_del / 2.0 && self.old_del < self.config.ref_del / 2.0 { self.p *= 0.98; - } + } self.p = self.p.clamp(0.0, 1.0); // RFC 8033 Section 4.4: Burst Tolerance - if self.p < f64::EPSILON && cur_del < self.config.ref_del / 2.0 && self.old_del < self.config.ref_del / 2.0 { + if self.p < f64::EPSILON + && cur_del < self.config.ref_del / 2.0 + && self.old_del < self.config.ref_del / 2.0 + { self.burst_allowance = self.config.max_burst; } else { self.burst_allowance = (self.burst_allowance - elapsed_ms).max(0.0); @@ -179,13 +189,13 @@ where } // RFC 8033 Section 4.1: Bypass random drop logic to be work conserving - let bypass_drop = (self.old_del < self.config.ref_del / 2.0 && self.p < 0.2) - || self.queue.len() <= 2; + let bypass_drop = + (self.old_del < self.config.ref_del / 2.0 && self.p < 0.2) || self.queue.len() <= 2; if bypass_drop { return false; } - let rand_val = random_range(0.0 .. 1.0); + let rand_val = random_range(0.0..1.0); rand_val < self.p } @@ -208,7 +218,8 @@ where if self.avg_drate.abs() < f64::EPSILON { self.avg_drate = dq_rate; } else { - self.avg_drate = (1.0 - self.config.epsilon) * self.avg_drate + self.config.epsilon * dq_rate; + self.avg_drate = (1.0 - self.config.epsilon) * self.avg_drate + + self.config.epsilon * dq_rate; } self.start_measurement = Some(now); self.dq_count = 0; @@ -244,11 +255,17 @@ where let interval_update = Instant::now().saturating_duration_since(self.start_update); if interval_update >= self.config.t_update { self.update_drop_probability(); - } - + } + let packet_size = packet.l3_length() + self.get_extra_length(); - let pass_hard_limit = self.config.packet_limit.is_none_or(|limit| self.queue.len() < limit) - && self.config.byte_limit.is_none_or(|limit| self.now_bytes + packet_size <= limit); + let pass_hard_limit = self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self + .config + .byte_limit + .is_none_or(|limit| self.now_bytes + packet_size <= limit); if !pass_hard_limit { #[cfg(test)] @@ -258,6 +275,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() ); + #[allow(clippy::needless_return)] return; } @@ -269,6 +287,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() ); + #[allow(clippy::needless_return)] return; } @@ -313,7 +332,7 @@ where } fn retain(&mut self, mut f: F) - where + where F: FnMut(&P) -> bool, { self.queue.retain(|packet| f(packet)); @@ -348,8 +367,10 @@ mod tests { #[test_log::test] fn test_pie_queue_hard_limit_packet() { - let mut config = PieQueueConfig::default(); - config.packet_limit = Some(2); + let config = PieQueueConfig { + packet_limit: Some(2), + ..Default::default() + }; let mut queue: PieQueue = PieQueue::new(config); queue.enqueue(create_packet(100)); @@ -363,8 +384,10 @@ mod tests { #[test_log::test] fn test_pie_queue_hard_limit_byte() { - let mut config = PieQueueConfig::default(); - config.byte_limit = Some(150); + let config = PieQueueConfig { + byte_limit: Some(150), + ..Default::default() + }; let mut queue: PieQueue = PieQueue::new(config); queue.enqueue(create_packet(100)); // l3 length 86. @@ -383,7 +406,7 @@ mod tests { // Force a high drop probability queue.p = 1.0; queue.burst_allowance = 100.0; - + // burst_allowance > 0 bypasses random drop queue.enqueue(create_packet(100)); assert_eq!(queue.length(), 1); @@ -428,8 +451,10 @@ mod tests { #[test_log::test] fn test_pie_queue_avg_drate_update() { - let mut config = PieQueueConfig::default(); - config.dq_threshold = 50; // Small threshold + let config = PieQueueConfig { + dq_threshold: 50, // Small threshold + ..Default::default() + }; let mut queue: PieQueue = PieQueue::new(config); queue.enqueue(create_packet(114)); // l3 length 100 @@ -447,7 +472,10 @@ mod tests { // Second dequeue triggers calculation of avg_drate queue.dequeue(); // dequeues 100 bytes assert!(queue.avg_drate > 0.0, "avg_drate should be calculated"); - assert!(queue.start_measurement.is_none(), "Should exit measurement cycle since queue is empty"); + assert!( + queue.start_measurement.is_none(), + "Should exit measurement cycle since queue is empty" + ); } #[test_log::test] @@ -458,12 +486,15 @@ mod tests { // Fake high delay queue.avg_drate = 1000.0; queue.now_bytes = 100000; // delay = 100.0s > ref_del - + std::thread::sleep(config.t_update); // Wait to exceed t_update - + // This enqueue will trigger update_drop_probability() - queue.enqueue(create_packet(14)); - - assert!(queue.p > 0.0, "Probability should increase when delay is high"); + queue.enqueue(create_packet(14)); + + assert!( + queue.p > 0.0, + "Probability should increase when delay is high" + ); } -} \ No newline at end of file +} diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index c5389aa..f5b60a6 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -4,10 +4,10 @@ use std::collections::VecDeque; +use rand::random_range; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use tokio::time::{Instant, Duration}; -use rand::random_range; +use tokio::time::{Duration, Instant}; use tracing::{debug, warn}; #[cfg(feature = "serde")] @@ -20,13 +20,19 @@ use crate::cells::Packet; pub struct RedQueueConfig { pub packet_limit: Option, pub byte_limit: Option, - pub w_q: f64, // queue weight for calculating the average queue length + pub w_q: f64, // queue weight for calculating the average queue length pub min_th: usize, // minimum threshold of average queue length pub max_th: usize, // maximum threshold of average queue length - pub max_p: f64, // maximum probability of dropping a packet - #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + pub max_p: f64, // maximum probability of dropping a packet + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] pub pkt_tx_time: Duration, // typical packet tx time (us) - #[cfg_attr(feature = "serde", serde(default, skip_serializing_if = "serde_default"))] + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] pub bw_type: BwType, } @@ -36,7 +42,7 @@ impl Default for RedQueueConfig { packet_limit: None, byte_limit: None, w_q: 0.002, - min_th: 7500, // 5 * 1500 bytes + min_th: 7500, // 5 * 1500 bytes max_th: 22500, // 15 * 1500 bytes max_p: 0.02, pkt_tx_time: Duration::from_micros(120), // 1500 bytes * 8 / 100Mbps = 120 us @@ -46,6 +52,7 @@ impl Default for RedQueueConfig { } impl RedQueueConfig { + #[allow(clippy::too_many_arguments)] pub fn new>, B: Into>>( packet_limit: A, byte_limit: B, @@ -54,13 +61,16 @@ impl RedQueueConfig { max_th: usize, max_p: f64, pkt_tx_time: Duration, - bw_type: BwType + bw_type: BwType, ) -> Self { // Warning: The caller must ensure that the parameters are valid. // It's recommended to do validation before calling this function, // or we may need to return a Result instead of Self in the future. if min_th >= max_th { - warn!("RedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", min_th, max_th); + warn!( + "RedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", + min_th, max_th + ); } if pkt_tx_time.as_micros() == 0 { warn!("RedQueueConfig: pkt_tx_time is 0, which will cause divide-by-zero in m calculation."); @@ -71,7 +81,7 @@ impl RedQueueConfig { if !(0.0..=1.0).contains(&max_p) { warn!("RedQueueConfig: max_p ({}) is out of expected range [0.0, 1.0]. This is a probability.", max_p); } - + Self { packet_limit: packet_limit.into(), byte_limit: byte_limit.into(), @@ -80,7 +90,7 @@ impl RedQueueConfig { max_th, max_p, pkt_tx_time, - bw_type + bw_type, } } } @@ -97,7 +107,7 @@ pub struct RedQueue

{ config: RedQueueConfig, now_bytes: usize, // for calculating average_queue_length average_queue_length: f64, - count_packet: i32, // number of packets since last dropping + count_packet: i32, // number of packets since last dropping idle_start: Option, // start time of current idle period } @@ -117,7 +127,7 @@ impl

RedQueue

{ impl

Default for RedQueue

where - P: Packet + P: Packet, { fn default() -> Self { Self::new(RedQueueConfig::default()) @@ -126,18 +136,20 @@ where impl

RedQueue

where - P: Packet, + P: Packet, { - fn update_avg (&mut self) { + fn update_avg(&mut self) { match self.is_empty() { false => { - self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + self.config.w_q * (self.now_bytes as f64) - }, + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + + self.config.w_q * (self.now_bytes as f64) + } true => { if let Some(idle_start) = self.idle_start { let now = Instant::now(); let idle_duration = now.saturating_duration_since(idle_start); - let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + let m = idle_duration.as_micros() as f64 + / self.config.pkt_tx_time.as_micros() as f64; self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); self.idle_start = Some(now); } @@ -145,7 +157,7 @@ where } } - fn should_drop (&mut self) -> bool { + fn should_drop(&mut self) -> bool { let avg = self.average_queue_length; let min_th = self.config.min_th as f64; let max_th = self.config.max_th as f64; @@ -158,7 +170,7 @@ where p_b / (1.0 - self.count_packet as f64 * p_b) }; - let rand_val = random_range(0.0 .. 1.0); + let rand_val = random_range(0.0..1.0); if rand_val < p_a { self.count_packet = 0; true @@ -192,10 +204,16 @@ where fn enqueue(&mut self, packet: P) { self.update_avg(); - + let packet_size = packet.l3_length() + self.get_extra_length(); - let pass_hard_limit = self.config.packet_limit.is_none_or(|limit| self.queue.len() < limit) - && self.config.byte_limit.is_none_or(|limit| self.now_bytes + packet_size <= limit); + let pass_hard_limit = self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self + .config + .byte_limit + .is_none_or(|limit| self.now_bytes + packet_size <= limit); if !pass_hard_limit { self.count_packet = 0; @@ -217,6 +235,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() ); + #[allow(clippy::needless_return)] return; } @@ -264,7 +283,6 @@ where } } - #[cfg(test)] mod tests { use super::*; @@ -277,9 +295,11 @@ mod tests { #[test_log::test] fn test_red_queue_basic() { - let mut config = RedQueueConfig::default(); - config.min_th = 1000; - config.max_th = 2000; + let config = RedQueueConfig { + min_th: 1000, + max_th: 2000, + ..Default::default() + }; let mut queue: RedQueue = RedQueue::new(config); assert!(queue.is_empty()); @@ -296,10 +316,12 @@ mod tests { #[test_log::test] fn test_red_queue_hard_limit_packet() { - let mut config = RedQueueConfig::default(); - config.packet_limit = Some(2); - config.min_th = 100000; // avoid red drop - config.max_th = 200000; + let config = RedQueueConfig { + packet_limit: Some(2), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; let mut queue: RedQueue = RedQueue::new(config); queue.enqueue(create_packet(100)); @@ -313,10 +335,12 @@ mod tests { #[test_log::test] fn test_red_queue_hard_limit_byte() { - let mut config = RedQueueConfig::default(); - config.byte_limit = Some(150); - config.min_th = 100000; // avoid red drop - config.max_th = 200000; + let config = RedQueueConfig { + byte_limit: Some(150), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; let mut queue: RedQueue = RedQueue::new(config); queue.enqueue(create_packet(100)); // l3 length 86. @@ -329,34 +353,42 @@ mod tests { #[test_log::test] fn test_red_queue_max_th_drop() { - let mut config = RedQueueConfig::default(); - config.min_th = 100; - config.max_th = 200; - config.w_q = 1.0; // max weight, avg matches instantly + let config = RedQueueConfig { + min_th: 100, + max_th: 200, + w_q: 1.0, // max weight, avg matches instantly + ..Default::default() + }; let mut queue: RedQueue = RedQueue::new(config); // First packet - queue.enqueue(create_packet(100)); - + queue.enqueue(create_packet(100)); + // Second packet queue.enqueue(create_packet(300)); - + // At this point, queue length is 2, now_bytes is high enough. // The next enqueue should see average_queue_length > max_th and drop the packet. let before_len = queue.length(); queue.enqueue(create_packet(100)); - assert_eq!(queue.length(), before_len, "Packet should be dropped by RED max_th"); + assert_eq!( + queue.length(), + before_len, + "Packet should be dropped by RED max_th" + ); } #[test_log::test] fn test_red_queue_min_th_no_drop() { - let mut config = RedQueueConfig::default(); - config.min_th = 1000; - config.max_th = 2000; - config.w_q = 1.0; // Instantly reach exact byte size + let config = RedQueueConfig { + min_th: 1000, + max_th: 2000, + w_q: 1.0, // Instantly reach exact byte size + ..Default::default() + }; let mut queue: RedQueue = RedQueue::new(config); - // First packet: queue empty, avg remains 0. + // First packet: queue empty, avg remains 0. queue.enqueue(create_packet(514)); // L3 size = 514 - 14 (Ethernet header) = 500 assert_eq!(queue.length(), 1); @@ -364,40 +396,45 @@ mod tests { // 500 < min_th(1000), so it should not drop. queue.enqueue(create_packet(414)); // L3 size = 400 assert_eq!(queue.length(), 2); - + // Check internal state: count_packet is -1 when avg < min_th assert_eq!(queue.count_packet, -1); } #[test_log::test] fn test_red_queue_probabilistic_drop() { - let mut config = RedQueueConfig::default(); - config.min_th = 100; - config.max_th = 300; - config.max_p = 0.5; - config.w_q = 1.0; + let config = RedQueueConfig { + min_th: 100, + max_th: 300, + max_p: 0.5, + w_q: 1.0, + ..Default::default() + }; let mut queue: RedQueue = RedQueue::new(config); // First packet: queue empty, avg = 0. L3 size = 200. - queue.enqueue(create_packet(214)); + queue.enqueue(create_packet(214)); assert_eq!(queue.length(), 1); let mut drop_count = 0; let total_packets = 1000; - + for _ in 0..total_packets { // enqueue packets with L3 size 0 (total size 14). // now_bytes stays at 200. w_q=1.0 makes avg exactly 200. // 100 (min_th) <= avg(200) < 300 (max_th), entering probabilistic drop zone. let before = queue.length(); - queue.enqueue(create_packet(14)); + queue.enqueue(create_packet(14)); if queue.length() == before { drop_count += 1; } } - + // It should drop some packets, but not all of them - assert!(drop_count > 0, "Should have dropped some packets probabilistically"); + assert!( + drop_count > 0, + "Should have dropped some packets probabilistically" + ); assert!(drop_count < total_packets, "Should not drop all packets"); } -} \ No newline at end of file +} From b482bd9dc9b9d4c780ee3f55f38b43eb983f7a95 Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Tue, 9 Jun 2026 19:36:01 +0800 Subject: [PATCH 10/12] fix function update_avg of RED --- rattan-core/src/cells/bandwidth/queue/red.rs | 27 +++++++++----------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index f5b60a6..8c2d7a9 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -139,21 +139,18 @@ where P: Packet, { fn update_avg(&mut self) { - match self.is_empty() { - false => { - self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length - + self.config.w_q * (self.now_bytes as f64) - } - true => { - if let Some(idle_start) = self.idle_start { - let now = Instant::now(); - let idle_duration = now.saturating_duration_since(idle_start); - let m = idle_duration.as_micros() as f64 - / self.config.pkt_tx_time.as_micros() as f64; - self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); - self.idle_start = Some(now); - } - } + if !self.is_empty() { + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + + self.config.w_q * (self.now_bytes as f64); + return; + } + + if let Some(idle_start) = self.idle_start { + let now = Instant::now(); + let idle_duration = now.saturating_duration_since(idle_start); + let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); + self.idle_start = Some(now); } } From a0357918f2074e55a3d5991121a494bf7b2587ba Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Tue, 9 Jun 2026 23:43:10 +0800 Subject: [PATCH 11/12] fix clippy warning & corresponding logic --- rattan-core/src/cells/bandwidth/queue/ared.rs | 30 ++----- rattan-core/src/cells/bandwidth/queue/pie.rs | 84 +++++++------------ rattan-core/src/cells/bandwidth/queue/red.rs | 32 ++----- 3 files changed, 45 insertions(+), 101 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs index 6be1a5f..657149d 100644 --- a/rattan-core/src/cells/bandwidth/queue/ared.rs +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -26,11 +26,6 @@ pub struct AdaptiveRedQueueConfig { feature = "serde", serde(default, skip_serializing_if = "serde_default") )] - pub pkt_tx_time: Duration, // typical packet tx time (us) - #[cfg_attr( - feature = "serde", - serde(default, skip_serializing_if = "serde_default") - )] pub bw_type: BwType, } @@ -43,14 +38,12 @@ impl Default for AdaptiveRedQueueConfig { min_th: 7500, // 5 * 1500 bytes max_th: 22500, // 15 * 1500 bytes max_p: 0.02, - pkt_tx_time: Duration::from_micros(120), // 1500 bytes * 8 / 100Mbps = 120 us bw_type: BwType::default(), } } } impl AdaptiveRedQueueConfig { - #[allow(clippy::too_many_arguments)] pub fn new>, B: Into>>( packet_limit: A, byte_limit: B, @@ -58,7 +51,6 @@ impl AdaptiveRedQueueConfig { min_th: usize, max_th: usize, max_p: f64, - pkt_tx_time: Duration, bw_type: BwType, ) -> Self { // Warning: The caller must ensure that the parameters are valid. @@ -67,9 +59,6 @@ impl AdaptiveRedQueueConfig { if min_th >= max_th { warn!("AdaptiveRedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", min_th, max_th); } - if pkt_tx_time.as_micros() == 0 { - warn!("AdaptiveRedQueueConfig: pkt_tx_time is 0, which will cause divide-by-zero in m calculation."); - } if !(0.0..=1.0).contains(&w_q) { warn!("AdaptiveRedQueueConfig: w_q ({}) is out of expected range [0.0, 1.0]. This is an EWMA weight.", w_q); } @@ -84,7 +73,6 @@ impl AdaptiveRedQueueConfig { min_th, max_th, max_p, - pkt_tx_time, bw_type, } } @@ -145,7 +133,8 @@ where if let Some(idle_start) = self.idle_start { let now = Instant::now(); let idle_duration = now.saturating_duration_since(idle_start); - let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us + let m = idle_duration.as_micros() as f64 / pkt_tx_time; self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); self.idle_start = Some(now); } @@ -236,10 +225,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() ); - return; - } - - if self.should_drop() { + } else if self.should_drop() { #[cfg(test)] tracing::trace!( avg = self.average_queue_length, @@ -247,13 +233,11 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() ); - #[allow(clippy::needless_return)] - return; + } else { + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; } - - self.now_bytes += packet_size; - self.queue.push_back(packet); - self.idle_start = None; } fn dequeue(&mut self) -> Option

{ diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs index 50318dc..d2fcf17 100644 --- a/rattan-core/src/cells/bandwidth/queue/pie.rs +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -22,13 +22,8 @@ use crate::cells::Packet; pub struct PieQueueConfig { pub packet_limit: Option, pub byte_limit: Option, - pub ref_del: f64, // target delay (sec) - pub t_update: Duration, // update interval - pub tilde_alpha: f64, // base value of alpha (Hz, 1/sec) - pub tilde_beta: f64, // base value of beta (Hz, 1/sec) - pub dq_threshold: usize, // threshold of queue length (bytes) - pub epsilon: f64, // EWMA weight - pub max_burst: f64, // MAX_BURST (ms) + pub ref_del: f64, // target delay (sec) + pub max_burst: f64, // MAX_BURST (ms) #[cfg_attr( feature = "serde", serde(default, skip_serializing_if = "serde_default") @@ -42,11 +37,6 @@ impl Default for PieQueueConfig { packet_limit: None, byte_limit: None, ref_del: 0.015, // RFC 8033 - t_update: Duration::from_millis(15), - tilde_alpha: 0.125, - tilde_beta: 1.25, - dq_threshold: 16384, // 16 KB - epsilon: 0.125, max_burst: 150.0, bw_type: BwType::default(), } @@ -54,16 +44,10 @@ impl Default for PieQueueConfig { } impl PieQueueConfig { - #[allow(clippy::too_many_arguments)] pub fn new>, B: Into>>( packet_limit: A, byte_limit: B, ref_del: f64, - t_update: Duration, - tilde_alpha: f64, - tilde_beta: f64, - dq_threshold: usize, - epsilon: f64, max_burst: f64, bw_type: BwType, ) -> Self { @@ -71,11 +55,6 @@ impl PieQueueConfig { packet_limit: packet_limit.into(), byte_limit: byte_limit.into(), ref_del, - t_update, - tilde_alpha, - tilde_beta, - dq_threshold, - epsilon, max_burst, bw_type, } @@ -147,8 +126,10 @@ where self.now_bytes as f64 / self.avg_drate }; - let mut p_increment = self.config.tilde_alpha * (cur_del - self.config.ref_del) - + self.config.tilde_beta * (cur_del - self.old_del); + let tilde_alpha = 0.125; // base value of alpha (Hz, 1/sec) + let tilde_beta = 1.25; // base value of beta (Hz, 1/sec) + let mut p_increment = + tilde_alpha * (cur_del - self.config.ref_del) + tilde_beta * (cur_del - self.old_del); if self.p < 0.000001 { p_increment /= 2048.0; } else if self.p < 0.00001 { @@ -201,9 +182,10 @@ where fn update_avg_drate(&mut self, pkt_size: usize) { let now = Instant::now(); + let dq_threshold = 16384; // 16 KB // Enter a measurement cycle - if self.now_bytes > self.config.dq_threshold && self.start_measurement.is_none() { + if self.now_bytes > dq_threshold && self.start_measurement.is_none() { self.start_measurement = Some(now); self.dq_count = 0; } @@ -211,15 +193,15 @@ where // Update departure rate if we are in a measurement cycle if let Some(start) = self.start_measurement { self.dq_count += pkt_size; - if self.dq_count >= self.config.dq_threshold { + if self.dq_count >= dq_threshold { let dq_int = now.saturating_duration_since(start).as_secs_f64(); if dq_int > f64::EPSILON { let dq_rate = self.dq_count as f64 / dq_int; if self.avg_drate.abs() < f64::EPSILON { self.avg_drate = dq_rate; } else { - self.avg_drate = (1.0 - self.config.epsilon) * self.avg_drate - + self.config.epsilon * dq_rate; + let epsilon = 0.125; + self.avg_drate = (1.0 - epsilon) * self.avg_drate + epsilon * dq_rate; } self.start_measurement = Some(now); self.dq_count = 0; @@ -227,7 +209,7 @@ where } // Exit measurement cycle if queue length drops below threshold - if self.now_bytes < self.config.dq_threshold { + if self.now_bytes < dq_threshold { self.start_measurement = None; self.dq_count = 0; } @@ -253,7 +235,8 @@ where fn enqueue(&mut self, packet: P) { // Simulate time-driven with event-driven approach let interval_update = Instant::now().saturating_duration_since(self.start_update); - if interval_update >= self.config.t_update { + let t_update = Duration::from_millis(15); + if interval_update >= t_update { self.update_drop_probability(); } @@ -275,11 +258,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() ); - #[allow(clippy::needless_return)] - return; - } - - if self.should_drop() { + } else if self.should_drop() { #[cfg(test)] tracing::trace!( p = self.p, @@ -287,18 +266,17 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() ); - #[allow(clippy::needless_return)] - return; + } else { + self.now_bytes += packet_size; + self.queue.push_back(packet); } - - self.now_bytes += packet_size; - self.queue.push_back(packet); } fn dequeue(&mut self) -> Option

{ // Simulate time-driven with event-driven approach let interval_update = Instant::now().saturating_duration_since(self.start_update); - if interval_update >= self.config.t_update { + let t_update = Duration::from_millis(15); + if interval_update >= t_update { self.update_drop_probability(); } @@ -451,30 +429,28 @@ mod tests { #[test_log::test] fn test_pie_queue_avg_drate_update() { - let config = PieQueueConfig { - dq_threshold: 50, // Small threshold - ..Default::default() - }; + let config = PieQueueConfig::default(); let mut queue: PieQueue = PieQueue::new(config); - queue.enqueue(create_packet(114)); // l3 length 100 - queue.enqueue(create_packet(114)); // l3 length 100 - assert_eq!(queue.now_bytes, 200); + queue.enqueue(create_packet(10014)); // l3 length 10000 + queue.enqueue(create_packet(10014)); // l3 length 10000 + queue.enqueue(create_packet(10014)); // l3 length 10000 + assert_eq!(queue.now_bytes, 30000); // First dequeue triggers start of measurement cycle assert!(queue.start_measurement.is_none()); - queue.dequeue(); // dequeues 100 bytes + queue.dequeue(); // dequeues 10000 bytes assert!(queue.start_measurement.is_some()); - assert_eq!(queue.now_bytes, 100); + assert_eq!(queue.now_bytes, 20000); std::thread::sleep(Duration::from_millis(10)); // Second dequeue triggers calculation of avg_drate - queue.dequeue(); // dequeues 100 bytes + queue.dequeue(); // dequeues 10000 bytes assert!(queue.avg_drate > 0.0, "avg_drate should be calculated"); assert!( queue.start_measurement.is_none(), - "Should exit measurement cycle since queue is empty" + "Should exit measurement cycle since now_bytes drops below threshold" ); } @@ -487,7 +463,7 @@ mod tests { queue.avg_drate = 1000.0; queue.now_bytes = 100000; // delay = 100.0s > ref_del - std::thread::sleep(config.t_update); // Wait to exceed t_update + std::thread::sleep(Duration::from_millis(15)); // Wait to exceed t_update // This enqueue will trigger update_drop_probability() queue.enqueue(create_packet(14)); diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index 8c2d7a9..8b54d61 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -7,7 +7,7 @@ use std::collections::VecDeque; use rand::random_range; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; -use tokio::time::{Duration, Instant}; +use tokio::time::Instant; use tracing::{debug, warn}; #[cfg(feature = "serde")] @@ -28,11 +28,6 @@ pub struct RedQueueConfig { feature = "serde", serde(default, skip_serializing_if = "serde_default") )] - pub pkt_tx_time: Duration, // typical packet tx time (us) - #[cfg_attr( - feature = "serde", - serde(default, skip_serializing_if = "serde_default") - )] pub bw_type: BwType, } @@ -45,14 +40,12 @@ impl Default for RedQueueConfig { min_th: 7500, // 5 * 1500 bytes max_th: 22500, // 15 * 1500 bytes max_p: 0.02, - pkt_tx_time: Duration::from_micros(120), // 1500 bytes * 8 / 100Mbps = 120 us bw_type: BwType::default(), } } } impl RedQueueConfig { - #[allow(clippy::too_many_arguments)] pub fn new>, B: Into>>( packet_limit: A, byte_limit: B, @@ -60,7 +53,6 @@ impl RedQueueConfig { min_th: usize, max_th: usize, max_p: f64, - pkt_tx_time: Duration, bw_type: BwType, ) -> Self { // Warning: The caller must ensure that the parameters are valid. @@ -72,9 +64,6 @@ impl RedQueueConfig { min_th, max_th ); } - if pkt_tx_time.as_micros() == 0 { - warn!("RedQueueConfig: pkt_tx_time is 0, which will cause divide-by-zero in m calculation."); - } if !(0.0..=1.0).contains(&w_q) { warn!("RedQueueConfig: w_q ({}) is out of expected range [0.0, 1.0]. This is an EWMA weight.", w_q); } @@ -89,7 +78,6 @@ impl RedQueueConfig { min_th, max_th, max_p, - pkt_tx_time, bw_type, } } @@ -148,7 +136,8 @@ where if let Some(idle_start) = self.idle_start { let now = Instant::now(); let idle_duration = now.saturating_duration_since(idle_start); - let m = idle_duration.as_micros() as f64 / self.config.pkt_tx_time.as_micros() as f64; + let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us + let m = idle_duration.as_micros() as f64 / pkt_tx_time; self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); self.idle_start = Some(now); } @@ -221,10 +210,7 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() ); - return; - } - - if self.should_drop() { + } else if self.should_drop() { #[cfg(test)] tracing::trace!( avg = self.average_queue_length, @@ -232,13 +218,11 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() ); - #[allow(clippy::needless_return)] - return; + } else { + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; } - - self.now_bytes += packet_size; - self.queue.push_back(packet); - self.idle_start = None; } fn dequeue(&mut self) -> Option

{ From affbf9e7b473ce5a31d52d97fac94252b61afd2d Mon Sep 17 00:00:00 2001 From: duanxy23 Date: Wed, 10 Jun 2026 14:54:56 +0800 Subject: [PATCH 12/12] revert enqueue() by using early return --- rattan-core/src/cells/bandwidth/queue/ared.rs | 13 ++++++++----- rattan-core/src/cells/bandwidth/queue/pie.rs | 11 +++++++---- rattan-core/src/cells/bandwidth/queue/red.rs | 13 ++++++++----- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs index 657149d..a57aa83 100644 --- a/rattan-core/src/cells/bandwidth/queue/ared.rs +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -225,7 +225,10 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() ); - } else if self.should_drop() { + return; + } + + if self.should_drop() { #[cfg(test)] tracing::trace!( avg = self.average_queue_length, @@ -233,11 +236,11 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() ); - } else { - self.now_bytes += packet_size; - self.queue.push_back(packet); - self.idle_start = None; + return; } + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; } fn dequeue(&mut self) -> Option

{ diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs index d2fcf17..0a75909 100644 --- a/rattan-core/src/cells/bandwidth/queue/pie.rs +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -258,7 +258,10 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() ); - } else if self.should_drop() { + return; + } + + if self.should_drop() { #[cfg(test)] tracing::trace!( p = self.p, @@ -266,10 +269,10 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() ); - } else { - self.now_bytes += packet_size; - self.queue.push_back(packet); + return; } + self.now_bytes += packet_size; + self.queue.push_back(packet); } fn dequeue(&mut self) -> Option

{ diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs index 8b54d61..63af4ec 100644 --- a/rattan-core/src/cells/bandwidth/queue/red.rs +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -210,7 +210,10 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() ); - } else if self.should_drop() { + return; + } + + if self.should_drop() { #[cfg(test)] tracing::trace!( avg = self.average_queue_length, @@ -218,11 +221,11 @@ where header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() ); - } else { - self.now_bytes += packet_size; - self.queue.push_back(packet); - self.idle_start = None; + return; } + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; } fn dequeue(&mut self) -> Option

{