diff --git a/rattan-core/src/cells/bandwidth/queue/ared.rs b/rattan-core/src/cells/bandwidth/queue/ared.rs new file mode 100644 index 0000000..a57aa83 --- /dev/null +++ b/rattan-core/src/cells/bandwidth/queue/ared.rs @@ -0,0 +1,507 @@ +// Adaptive RED Queue Implementation Reference: +// https://www.icir.org/floyd/papers/adaptiveRed.pdf +use std::collections::VecDeque; + +use rand::random_range; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use tokio::time::{Duration, Instant}; +use tracing::{debug, warn}; + +#[cfg(feature = "serde")] +use super::serde_default; +use super::{BwType, PacketQueue}; +use crate::cells::Packet; + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize), serde(default))] +#[derive(Debug, Clone)] +pub struct AdaptiveRedQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, + pub w_q: f64, // queue weight for calculating the average queue length + pub min_th: usize, // minimum threshold of average queue length + pub max_th: usize, // maximum threshold of average queue length + pub max_p: f64, // maximum probability of dropping a packet + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] + pub bw_type: BwType, +} + +impl Default for AdaptiveRedQueueConfig { + fn default() -> Self { + Self { + packet_limit: None, + byte_limit: None, + w_q: 0.002, + min_th: 7500, // 5 * 1500 bytes + max_th: 22500, // 15 * 1500 bytes + max_p: 0.02, + bw_type: BwType::default(), + } + } +} + +impl AdaptiveRedQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + w_q: f64, + min_th: usize, + max_th: usize, + max_p: f64, + bw_type: BwType, + ) -> Self { + // Warning: The caller must ensure that the parameters are valid. + // It's recommended to do validation before calling this function, + // or we may need to return a Result instead of Self in the future. + if min_th >= max_th { + warn!("AdaptiveRedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", min_th, max_th); + } + if !(0.0..=1.0).contains(&w_q) { + warn!("AdaptiveRedQueueConfig: w_q ({}) is out of expected range [0.0, 1.0]. This is an EWMA weight.", w_q); + } + if !(0.0..=1.0).contains(&max_p) { + warn!("AdaptiveRedQueueConfig: max_p ({}) is out of expected range [0.0, 1.0]. This is a probability.", max_p); + } + + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + w_q, + min_th, + max_th, + max_p, + bw_type, + } + } +} + +impl

From for AdaptiveRedQueue

{ + fn from(config: AdaptiveRedQueueConfig) -> Self { + AdaptiveRedQueue::new(config) + } +} + +#[derive(Debug)] +pub struct AdaptiveRedQueue

{ + queue: VecDeque

, + config: AdaptiveRedQueueConfig, + now_bytes: usize, // for calculating average_queue_length + average_queue_length: f64, + count_packet: i32, // number of packets since last dropping + idle_start: Option, // start time of current idle period + latest_max_p_update: Instant, // latest time when max_p is updates +} + +impl

AdaptiveRedQueue

{ + pub fn new(config: AdaptiveRedQueueConfig) -> Self { + debug!(?config, "New AdaptiveRedQueue"); + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + average_queue_length: 0.0, + count_packet: -1, + idle_start: None, + latest_max_p_update: Instant::now(), + } + } +} + +impl

Default for AdaptiveRedQueue

+where + P: Packet, +{ + fn default() -> Self { + Self::new(AdaptiveRedQueueConfig::default()) + } +} + +impl

AdaptiveRedQueue

+where + P: Packet, +{ + fn update_avg(&mut self) { + if !self.is_empty() { + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + + self.config.w_q * (self.now_bytes as f64); + return; + } + + if let Some(idle_start) = self.idle_start { + let now = Instant::now(); + let idle_duration = now.saturating_duration_since(idle_start); + let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us + let m = idle_duration.as_micros() as f64 / pkt_tx_time; + self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); + self.idle_start = Some(now); + } + } + + fn should_drop(&mut self) -> bool { + let avg = self.average_queue_length; + let min_th = self.config.min_th as f64; + let max_th = self.config.max_th as f64; + if avg >= min_th && avg < max_th { + self.count_packet += 1; + let p_b = self.config.max_p * (avg - min_th) / (max_th - min_th); + let p_a = if self.count_packet as f64 * p_b >= 1.0 { + 1.0 + } else { + p_b / (1.0 - self.count_packet as f64 * p_b) + }; + + let rand_val = random_range(0.0..1.0); + if rand_val < p_a { + self.count_packet = 0; + true + } else { + false + } + } else if avg >= max_th { + self.count_packet = 0; + true + } else { + self.count_packet = -1; + false + } + } + + fn update_max_p(&mut self) { + let target_min = + self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; + let target_max = + self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; + if self.average_queue_length > target_max && self.config.max_p <= 0.5 { + self.config.max_p += (self.config.max_p / 4.0).min(0.01); + } else if self.average_queue_length < target_min && self.config.max_p >= 0.01 { + self.config.max_p *= 0.9; + } + } +} + +impl

PacketQueue

for AdaptiveRedQueue

+where + P: Packet, +{ + type Config = AdaptiveRedQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn is_zero_buffer(&self) -> bool { + self.config.packet_limit.is_some_and(|limit| limit == 0) + || self.config.byte_limit.is_some_and(|limit| limit == 0) + } + + fn enqueue(&mut self, packet: P) { + self.update_avg(); + + let now = Instant::now(); + if now.saturating_duration_since(self.latest_max_p_update) >= Duration::from_millis(500) { + self.update_max_p(); + self.latest_max_p_update = now; + } + + let packet_size = packet.l3_length() + self.get_extra_length(); + let pass_hard_limit = self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self + .config + .byte_limit + .is_none_or(|limit| self.now_bytes + packet_size <= limit); + + if !pass_hard_limit { + self.count_packet = 0; + #[cfg(test)] + tracing::trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() + ); + return; + } + + if self.should_drop() { + #[cfg(test)] + tracing::trace!( + avg = self.average_queue_length, + count = self.count_packet, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to ARED algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + } + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; + } + + fn dequeue(&mut self) -> Option

{ + if let Some(packet) = self.queue.pop_front() { + self.now_bytes -= packet.l3_length() + self.get_extra_length(); + if self.is_empty() { + self.idle_start = Some(Instant::now()); + } + Some(packet) + } else { + None + } + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[inline(always)] + fn get_extra_length(&self) -> usize { + self.config.bw_type.extra_length() + } + + fn get_front_size(&self) -> Option { + self.queue + .front() + .map(|packet| self.get_packet_size(packet)) + } + + fn length(&self) -> usize { + self.queue.len() + } + + fn retain(&mut self, mut f: F) + where + F: FnMut(&P) -> bool, + { + self.queue.retain(|packet| f(packet)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cells::StdPacket; + + fn create_packet(size: usize) -> StdPacket { + let buf = vec![0u8; size]; + StdPacket::with_timestamp(&buf, Instant::now()) + } + + #[test_log::test] + fn test_ared_queue_basic() { + let config = AdaptiveRedQueueConfig { + min_th: 1000, + max_th: 2000, + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + assert!(queue.is_empty()); + + let pkt1 = create_packet(500); + queue.enqueue(pkt1); + assert!(!queue.is_empty()); + assert_eq!(queue.length(), 1); + + let dequeued = queue.dequeue(); + assert!(dequeued.is_some()); + assert!(queue.is_empty()); + } + + #[test_log::test] + fn test_ared_queue_hard_limit_packet() { + let config = AdaptiveRedQueueConfig { + packet_limit: Some(2), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + + // This one should be dropped due to packet limit + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + } + + #[test_log::test] + fn test_ared_queue_hard_limit_byte() { + let config = AdaptiveRedQueueConfig { + byte_limit: Some(150), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + queue.enqueue(create_packet(100)); // l3 length 86. + assert_eq!(queue.length(), 1); + + // This one should be dropped due to byte limit (86 + 86 > 150) + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + } + + #[test_log::test] + fn test_ared_queue_max_th_drop() { + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 200, + w_q: 1.0, // max weight, avg matches instantly + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // First packet + queue.enqueue(create_packet(100)); + + // Second packet + queue.enqueue(create_packet(300)); + + // At this point, queue length is 2, now_bytes is high enough. + // The next enqueue should see average_queue_length > max_th and drop the packet. + let before_len = queue.length(); + queue.enqueue(create_packet(100)); + assert_eq!( + queue.length(), + before_len, + "Packet should be dropped by ARED max_th" + ); + } + + #[test_log::test] + fn test_ared_queue_min_th_no_drop() { + let config = AdaptiveRedQueueConfig { + min_th: 1000, + max_th: 2000, + w_q: 1.0, // Instantly reach exact byte size + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // First packet: queue empty, avg remains 0. + queue.enqueue(create_packet(514)); // L3 size = 514 - 14 (Ethernet header) = 500 + assert_eq!(queue.length(), 1); + + // Second packet: queue has 500 bytes. w_q=1.0 makes avg = 500. + // 500 < min_th(1000), so it should not drop. + queue.enqueue(create_packet(414)); // L3 size = 400 + assert_eq!(queue.length(), 2); + + // Check internal state: count_packet is -1 when avg < min_th + assert_eq!(queue.count_packet, -1); + } + + #[test_log::test] + fn test_ared_queue_probabilistic_drop() { + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 300, + max_p: 0.5, + w_q: 1.0, + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // First packet: queue empty, avg = 0. L3 size = 200. + queue.enqueue(create_packet(214)); + assert_eq!(queue.length(), 1); + + let mut drop_count = 0; + let total_packets = 1000; + + for _ in 0..total_packets { + // enqueue packets with L3 size 0 (total size 14). + // now_bytes stays at 200. w_q=1.0 makes avg exactly 200. + // 100 (min_th) <= avg(200) < 300 (max_th), entering probabilistic drop zone. + let before = queue.length(); + queue.enqueue(create_packet(14)); + if queue.length() == before { + drop_count += 1; + } + } + + // It should drop some packets, but not all of them + assert!( + drop_count > 0, + "Should have dropped some packets probabilistically" + ); + assert!(drop_count < total_packets, "Should not drop all packets"); + } + + #[test_log::test] + fn test_ared_queue_max_p_increase() { + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 200, + max_p: 0.02, + w_q: 1.0, // Instantly update avg + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // enqueue to make average_queue_length > target_max + // target_max = min_th + 0.6 * (max_th - min_th) = 100 + 60 = 160 + // We make avg = 180 (L3 size 180, total 194) + queue.enqueue(create_packet(194)); + assert_eq!(queue.average_queue_length, 0.0); // First enqueue updates avg based on empty queue rule (avg=0). + + // Second enqueue updates avg to 180 + queue.enqueue(create_packet(14)); + assert_eq!(queue.average_queue_length, 180.0); + + // Set latest_max_p_update to 600ms ago to trigger update_max_p + queue.latest_max_p_update = Instant::now() - Duration::from_millis(600); + let before_max_p = queue.config.max_p; + + // Third enqueue triggers update_max_p + queue.enqueue(create_packet(14)); + + let after_max_p = queue.config.max_p; + assert!( + after_max_p > before_max_p, + "max_p should increase when avg > target_max" + ); + } + + #[test_log::test] + fn test_ared_queue_max_p_decrease() { + let config = AdaptiveRedQueueConfig { + min_th: 100, + max_th: 200, + max_p: 0.05, // Starting with a high max_p + w_q: 1.0, + ..Default::default() + }; + let mut queue: AdaptiveRedQueue = AdaptiveRedQueue::new(config); + + // enqueue to make average_queue_length < target_min + // target_min = min_th + 0.4 * (max_th - min_th) = 100 + 40 = 140 + // We make avg = 120 (L3 size 120, total 134) + queue.enqueue(create_packet(134)); + assert_eq!(queue.average_queue_length, 0.0); + + queue.enqueue(create_packet(14)); + assert_eq!(queue.average_queue_length, 120.0); + + // Set latest_max_p_update to 600ms ago + queue.latest_max_p_update = Instant::now() - Duration::from_millis(600); + let before_max_p = queue.config.max_p; + + queue.enqueue(create_packet(14)); + + let after_max_p = queue.config.max_p; + assert!( + after_max_p < before_max_p, + "max_p should decrease when avg < target_min" + ); + } +} diff --git a/rattan-core/src/cells/bandwidth/queue/mod.rs b/rattan-core/src/cells/bandwidth/queue/mod.rs index eecaf82..f4c9c22 100644 --- a/rattan-core/src/cells/bandwidth/queue/mod.rs +++ b/rattan-core/src/cells/bandwidth/queue/mod.rs @@ -11,15 +11,21 @@ use tokio::time::Instant; use super::BwType; use crate::cells::{Packet, LARGE_DURATION}; +mod ared; mod codel; mod drophead; mod droptail; mod infinite; +mod pie; +mod red; +pub use ared::*; pub use codel::*; pub use drophead::*; pub use droptail::*; pub use infinite::*; +pub use pie::*; +pub use red::*; #[cfg(feature = "serde")] fn serde_default(t: &T) -> bool { diff --git a/rattan-core/src/cells/bandwidth/queue/pie.rs b/rattan-core/src/cells/bandwidth/queue/pie.rs new file mode 100644 index 0000000..0a75909 --- /dev/null +++ b/rattan-core/src/cells/bandwidth/queue/pie.rs @@ -0,0 +1,479 @@ +// PIE Queue Implementation Reference: +// https://www.rfc-editor.org/info/rfc8033 +// https://ieeexplore.ieee.org/document/6602305 +// Reproduced according to RFC 8033 Appendix B, +// rather than original paper or RFC Appendix A. + +use std::collections::VecDeque; + +use rand::random_range; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use tokio::time::{Duration, Instant}; +use tracing::debug; + +#[cfg(feature = "serde")] +use super::serde_default; +use super::{BwType, PacketQueue}; +use crate::cells::Packet; + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize), serde(default))] +#[derive(Debug, Clone)] +pub struct PieQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, + pub ref_del: f64, // target delay (sec) + pub max_burst: f64, // MAX_BURST (ms) + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] + pub bw_type: BwType, +} + +impl Default for PieQueueConfig { + fn default() -> Self { + Self { + packet_limit: None, + byte_limit: None, + ref_del: 0.015, // RFC 8033 + max_burst: 150.0, + bw_type: BwType::default(), + } + } +} + +impl PieQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + ref_del: f64, + max_burst: f64, + bw_type: BwType, + ) -> Self { + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + ref_del, + max_burst, + bw_type, + } + } +} + +impl

From for PieQueue

{ + fn from(config: PieQueueConfig) -> Self { + PieQueue::new(config) + } +} + +#[derive(Debug)] +pub struct PieQueue

{ + queue: VecDeque

, + config: PieQueueConfig, + now_bytes: usize, + old_del: f64, // previous delay (sec) + p: f64, // current drop probability + dq_count: usize, // departure count (bytes) + start_update: Instant, // start time of t_update + start_measurement: Option, // Some(Instant) when in a measurement cycle, None when quit + avg_drate: f64, + burst_allowance: f64, +} + +impl

PieQueue

{ + pub fn new(config: PieQueueConfig) -> Self { + debug!(?config, "New PieQueue"); + let max_burst = config.max_burst; + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + old_del: 0.0, + p: 0.0, + dq_count: 0, + start_update: Instant::now(), + start_measurement: None, + avg_drate: 0.0, + burst_allowance: max_burst, + } + } +} + +impl

Default for PieQueue

+where + P: Packet, +{ + fn default() -> Self { + Self::new(PieQueueConfig::default()) + } +} + +impl

PieQueue

+where + P: Packet, +{ + fn update_drop_probability(&mut self) { + let now = Instant::now(); + let elapsed_ms = now + .saturating_duration_since(self.start_update) + .as_secs_f64() + * 1000.0; + + let cur_del = if self.avg_drate.abs() < f64::EPSILON { + 0.0 + } else { + self.now_bytes as f64 / self.avg_drate + }; + + let tilde_alpha = 0.125; // base value of alpha (Hz, 1/sec) + let tilde_beta = 1.25; // base value of beta (Hz, 1/sec) + let mut p_increment = + tilde_alpha * (cur_del - self.config.ref_del) + tilde_beta * (cur_del - self.old_del); + if self.p < 0.000001 { + p_increment /= 2048.0; + } else if self.p < 0.00001 { + p_increment /= 512.0; + } else if self.p < 0.0001 { + p_increment /= 128.0; + } else if self.p < 0.001 { + p_increment /= 32.0; + } else if self.p < 0.01 { + p_increment /= 8.0; + } else if self.p < 0.1 { + p_increment /= 2.0; + } + self.p += p_increment; + // RFC 8033 Section 4.2: Exponential decay when system is not congested + if cur_del < self.config.ref_del / 2.0 && self.old_del < self.config.ref_del / 2.0 { + self.p *= 0.98; + } + self.p = self.p.clamp(0.0, 1.0); + + // RFC 8033 Section 4.4: Burst Tolerance + if self.p < f64::EPSILON + && cur_del < self.config.ref_del / 2.0 + && self.old_del < self.config.ref_del / 2.0 + { + self.burst_allowance = self.config.max_burst; + } else { + self.burst_allowance = (self.burst_allowance - elapsed_ms).max(0.0); + } + self.old_del = cur_del; + self.start_update = now; + } + + fn should_drop(&self) -> bool { + // RFC 8033 Section 4.4: Enqueue packet bypassing random drop if burst_allow > 0 + if self.burst_allowance > f64::EPSILON { + return false; + } + + // RFC 8033 Section 4.1: Bypass random drop logic to be work conserving + let bypass_drop = + (self.old_del < self.config.ref_del / 2.0 && self.p < 0.2) || self.queue.len() <= 2; + if bypass_drop { + return false; + } + + let rand_val = random_range(0.0..1.0); + rand_val < self.p + } + + fn update_avg_drate(&mut self, pkt_size: usize) { + let now = Instant::now(); + let dq_threshold = 16384; // 16 KB + + // Enter a measurement cycle + if self.now_bytes > dq_threshold && self.start_measurement.is_none() { + self.start_measurement = Some(now); + self.dq_count = 0; + } + + // Update departure rate if we are in a measurement cycle + if let Some(start) = self.start_measurement { + self.dq_count += pkt_size; + if self.dq_count >= dq_threshold { + let dq_int = now.saturating_duration_since(start).as_secs_f64(); + if dq_int > f64::EPSILON { + let dq_rate = self.dq_count as f64 / dq_int; + if self.avg_drate.abs() < f64::EPSILON { + self.avg_drate = dq_rate; + } else { + let epsilon = 0.125; + self.avg_drate = (1.0 - epsilon) * self.avg_drate + epsilon * dq_rate; + } + self.start_measurement = Some(now); + self.dq_count = 0; + } + } + + // Exit measurement cycle if queue length drops below threshold + if self.now_bytes < dq_threshold { + self.start_measurement = None; + self.dq_count = 0; + } + } + } +} + +impl

PacketQueue

for PieQueue

+where + P: Packet, +{ + type Config = PieQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn is_zero_buffer(&self) -> bool { + self.config.packet_limit.is_some_and(|limit| limit == 0) + || self.config.byte_limit.is_some_and(|limit| limit == 0) + } + + fn enqueue(&mut self, packet: P) { + // Simulate time-driven with event-driven approach + let interval_update = Instant::now().saturating_duration_since(self.start_update); + let t_update = Duration::from_millis(15); + if interval_update >= t_update { + self.update_drop_probability(); + } + + let packet_size = packet.l3_length() + self.get_extra_length(); + let pass_hard_limit = self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self + .config + .byte_limit + .is_none_or(|limit| self.now_bytes + packet_size <= limit); + + if !pass_hard_limit { + #[cfg(test)] + tracing::trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.get_extra_length() + ); + return; + } + + if self.should_drop() { + #[cfg(test)] + tracing::trace!( + p = self.p, + old_delay = self.old_del, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to PIE algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + } + self.now_bytes += packet_size; + self.queue.push_back(packet); + } + + fn dequeue(&mut self) -> Option

{ + // Simulate time-driven with event-driven approach + let interval_update = Instant::now().saturating_duration_since(self.start_update); + let t_update = Duration::from_millis(15); + if interval_update >= t_update { + self.update_drop_probability(); + } + + if let Some(packet) = self.queue.pop_front() { + let pkt_size = packet.l3_length() + self.get_extra_length(); + self.now_bytes -= pkt_size; + self.update_avg_drate(pkt_size); + Some(packet) + } else { + None + } + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[inline(always)] + fn get_extra_length(&self) -> usize { + self.config.bw_type.extra_length() + } + + fn get_front_size(&self) -> Option { + self.queue + .front() + .map(|packet| self.get_packet_size(packet)) + } + + fn length(&self) -> usize { + self.queue.len() + } + + fn retain(&mut self, mut f: F) + where + F: FnMut(&P) -> bool, + { + self.queue.retain(|packet| f(packet)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cells::StdPacket; + + fn create_packet(size: usize) -> StdPacket { + let buf = vec![0u8; size]; + StdPacket::with_timestamp(&buf, Instant::now()) + } + + #[test_log::test] + fn test_pie_queue_basic() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config); + assert!(queue.is_empty()); + + let pkt1 = create_packet(500); + queue.enqueue(pkt1); + assert!(!queue.is_empty()); + assert_eq!(queue.length(), 1); + + let dequeued = queue.dequeue(); + assert!(dequeued.is_some()); + assert!(queue.is_empty()); + } + + #[test_log::test] + fn test_pie_queue_hard_limit_packet() { + let config = PieQueueConfig { + packet_limit: Some(2), + ..Default::default() + }; + let mut queue: PieQueue = PieQueue::new(config); + + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + + // This one should be dropped due to packet limit + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + } + + #[test_log::test] + fn test_pie_queue_hard_limit_byte() { + let config = PieQueueConfig { + byte_limit: Some(150), + ..Default::default() + }; + let mut queue: PieQueue = PieQueue::new(config); + + queue.enqueue(create_packet(100)); // l3 length 86. + assert_eq!(queue.length(), 1); + + // This one should be dropped due to byte limit (86 + 86 > 150) + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + } + + #[test_log::test] + fn test_pie_queue_burst_allowance() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config); + + // Force a high drop probability + queue.p = 1.0; + queue.burst_allowance = 100.0; + + // burst_allowance > 0 bypasses random drop + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + + // Fill queue > 2 to bypass work conserving logic later + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + + queue.burst_allowance = 0.0; + // With burst_allowance = 0.0, queue > 2, and p = 1.0, it should drop + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + } + + #[test_log::test] + fn test_pie_queue_work_conserving() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config.clone()); + queue.p = 1.0; + queue.burst_allowance = 0.0; + + // bypass_drop handles queue.len() <= 2 + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + + // For the 4th element, queue.len() is 3, so it does not bypass based on length + // Since p = 1.0, it drops + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 3); + + // Now test bypass_drop condition: old_del < ref_del/2 and p < 0.2 + queue.p = 0.15; + queue.old_del = config.ref_del / 3.0; + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 4); + } + + #[test_log::test] + fn test_pie_queue_avg_drate_update() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config); + + queue.enqueue(create_packet(10014)); // l3 length 10000 + queue.enqueue(create_packet(10014)); // l3 length 10000 + queue.enqueue(create_packet(10014)); // l3 length 10000 + assert_eq!(queue.now_bytes, 30000); + + // First dequeue triggers start of measurement cycle + assert!(queue.start_measurement.is_none()); + queue.dequeue(); // dequeues 10000 bytes + assert!(queue.start_measurement.is_some()); + assert_eq!(queue.now_bytes, 20000); + + std::thread::sleep(Duration::from_millis(10)); + + // Second dequeue triggers calculation of avg_drate + queue.dequeue(); // dequeues 10000 bytes + assert!(queue.avg_drate > 0.0, "avg_drate should be calculated"); + assert!( + queue.start_measurement.is_none(), + "Should exit measurement cycle since now_bytes drops below threshold" + ); + } + + #[test_log::test] + fn test_pie_queue_update_drop_probability() { + let config = PieQueueConfig::default(); + let mut queue: PieQueue = PieQueue::new(config.clone()); + + // Fake high delay + queue.avg_drate = 1000.0; + queue.now_bytes = 100000; // delay = 100.0s > ref_del + + std::thread::sleep(Duration::from_millis(15)); // Wait to exceed t_update + + // This enqueue will trigger update_drop_probability() + queue.enqueue(create_packet(14)); + + assert!( + queue.p > 0.0, + "Probability should increase when delay is high" + ); + } +} diff --git a/rattan-core/src/cells/bandwidth/queue/red.rs b/rattan-core/src/cells/bandwidth/queue/red.rs new file mode 100644 index 0000000..63af4ec --- /dev/null +++ b/rattan-core/src/cells/bandwidth/queue/red.rs @@ -0,0 +1,424 @@ +// RED Queue Implementation Reference: +// https://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=251892 +// https://github.com/torvalds/linux/blob/master/include/net/red.h + +use std::collections::VecDeque; + +use rand::random_range; +#[cfg(feature = "serde")] +use serde::{Deserialize, Serialize}; +use tokio::time::Instant; +use tracing::{debug, warn}; + +#[cfg(feature = "serde")] +use super::serde_default; +use super::{BwType, PacketQueue}; +use crate::cells::Packet; + +#[cfg_attr(feature = "serde", derive(Deserialize, Serialize), serde(default))] +#[derive(Debug, Clone)] +pub struct RedQueueConfig { + pub packet_limit: Option, + pub byte_limit: Option, + pub w_q: f64, // queue weight for calculating the average queue length + pub min_th: usize, // minimum threshold of average queue length + pub max_th: usize, // maximum threshold of average queue length + pub max_p: f64, // maximum probability of dropping a packet + #[cfg_attr( + feature = "serde", + serde(default, skip_serializing_if = "serde_default") + )] + pub bw_type: BwType, +} + +impl Default for RedQueueConfig { + fn default() -> Self { + Self { + packet_limit: None, + byte_limit: None, + w_q: 0.002, + min_th: 7500, // 5 * 1500 bytes + max_th: 22500, // 15 * 1500 bytes + max_p: 0.02, + bw_type: BwType::default(), + } + } +} + +impl RedQueueConfig { + pub fn new>, B: Into>>( + packet_limit: A, + byte_limit: B, + w_q: f64, + min_th: usize, + max_th: usize, + max_p: f64, + bw_type: BwType, + ) -> Self { + // Warning: The caller must ensure that the parameters are valid. + // It's recommended to do validation before calling this function, + // or we may need to return a Result instead of Self in the future. + if min_th >= max_th { + warn!( + "RedQueueConfig: min_th ({}) >= max_th ({}), which may cause invalid behavior.", + min_th, max_th + ); + } + if !(0.0..=1.0).contains(&w_q) { + warn!("RedQueueConfig: w_q ({}) is out of expected range [0.0, 1.0]. This is an EWMA weight.", w_q); + } + if !(0.0..=1.0).contains(&max_p) { + warn!("RedQueueConfig: max_p ({}) is out of expected range [0.0, 1.0]. This is a probability.", max_p); + } + + Self { + packet_limit: packet_limit.into(), + byte_limit: byte_limit.into(), + w_q, + min_th, + max_th, + max_p, + bw_type, + } + } +} + +impl

From for RedQueue

{ + fn from(config: RedQueueConfig) -> Self { + RedQueue::new(config) + } +} + +#[derive(Debug)] +pub struct RedQueue

{ + queue: VecDeque

, + config: RedQueueConfig, + now_bytes: usize, // for calculating average_queue_length + average_queue_length: f64, + count_packet: i32, // number of packets since last dropping + idle_start: Option, // start time of current idle period +} + +impl

RedQueue

{ + pub fn new(config: RedQueueConfig) -> Self { + debug!(?config, "New RedQueue"); + Self { + queue: VecDeque::new(), + config, + now_bytes: 0, + average_queue_length: 0.0, + count_packet: -1, + idle_start: None, + } + } +} + +impl

Default for RedQueue

+where + P: Packet, +{ + fn default() -> Self { + Self::new(RedQueueConfig::default()) + } +} + +impl

RedQueue

+where + P: Packet, +{ + fn update_avg(&mut self) { + if !self.is_empty() { + self.average_queue_length = (1.0 - self.config.w_q) * self.average_queue_length + + self.config.w_q * (self.now_bytes as f64); + return; + } + + if let Some(idle_start) = self.idle_start { + let now = Instant::now(); + let idle_duration = now.saturating_duration_since(idle_start); + let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us + let m = idle_duration.as_micros() as f64 / pkt_tx_time; + self.average_queue_length *= f64::powf(1.0 - self.config.w_q, m); + self.idle_start = Some(now); + } + } + + fn should_drop(&mut self) -> bool { + let avg = self.average_queue_length; + let min_th = self.config.min_th as f64; + let max_th = self.config.max_th as f64; + if avg >= min_th && avg < max_th { + self.count_packet += 1; + let p_b = self.config.max_p * (avg - min_th) / (max_th - min_th); + let p_a = if self.count_packet as f64 * p_b >= 1.0 { + 1.0 + } else { + p_b / (1.0 - self.count_packet as f64 * p_b) + }; + + let rand_val = random_range(0.0..1.0); + if rand_val < p_a { + self.count_packet = 0; + true + } else { + false + } + } else if avg >= max_th { + self.count_packet = 0; + true + } else { + self.count_packet = -1; + false + } + } +} + +impl

PacketQueue

for RedQueue

+where + P: Packet, +{ + type Config = RedQueueConfig; + + fn configure(&mut self, config: Self::Config) { + self.config = config; + } + + fn is_zero_buffer(&self) -> bool { + self.config.packet_limit.is_some_and(|limit| limit == 0) + || self.config.byte_limit.is_some_and(|limit| limit == 0) + } + + fn enqueue(&mut self, packet: P) { + self.update_avg(); + + let packet_size = packet.l3_length() + self.get_extra_length(); + let pass_hard_limit = self + .config + .packet_limit + .is_none_or(|limit| self.queue.len() < limit) + && self + .config + .byte_limit + .is_none_or(|limit| self.now_bytes + packet_size <= limit); + + if !pass_hard_limit { + self.count_packet = 0; + #[cfg(test)] + tracing::trace!( + queue_len = self.queue.len(), + now_bytes = self.now_bytes, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to hard limit", packet.l3_length(), self.config.bw_type.extra_length() + ); + return; + } + + if self.should_drop() { + #[cfg(test)] + tracing::trace!( + avg = self.average_queue_length, + count = self.count_packet, + header = ?format!("{:X?}", &packet.as_slice()[0..std::cmp::min(56, packet.length())]), + "Drop packet(l3_len: {}, extra_len: {}) due to RED algorithm", packet.l3_length(), self.get_extra_length() + ); + return; + } + self.now_bytes += packet_size; + self.queue.push_back(packet); + self.idle_start = None; + } + + fn dequeue(&mut self) -> Option

{ + if let Some(packet) = self.queue.pop_front() { + self.now_bytes -= packet.l3_length() + self.get_extra_length(); + if self.is_empty() { + self.idle_start = Some(Instant::now()); + } + Some(packet) + } else { + None + } + } + + fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + #[inline(always)] + fn get_extra_length(&self) -> usize { + self.config.bw_type.extra_length() + } + + fn get_front_size(&self) -> Option { + self.queue + .front() + .map(|packet| self.get_packet_size(packet)) + } + + fn length(&self) -> usize { + self.queue.len() + } + + fn retain(&mut self, mut f: F) + where + F: FnMut(&P) -> bool, + { + self.queue.retain(|packet| f(packet)); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::cells::StdPacket; + + fn create_packet(size: usize) -> StdPacket { + let buf = vec![0u8; size]; + StdPacket::with_timestamp(&buf, Instant::now()) + } + + #[test_log::test] + fn test_red_queue_basic() { + let config = RedQueueConfig { + min_th: 1000, + max_th: 2000, + ..Default::default() + }; + let mut queue: RedQueue = RedQueue::new(config); + + assert!(queue.is_empty()); + + let pkt1 = create_packet(500); + queue.enqueue(pkt1); + assert!(!queue.is_empty()); + assert_eq!(queue.length(), 1); + + let dequeued = queue.dequeue(); + assert!(dequeued.is_some()); + assert!(queue.is_empty()); + } + + #[test_log::test] + fn test_red_queue_hard_limit_packet() { + let config = RedQueueConfig { + packet_limit: Some(2), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; + let mut queue: RedQueue = RedQueue::new(config); + + queue.enqueue(create_packet(100)); + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + + // This one should be dropped due to packet limit + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 2); + } + + #[test_log::test] + fn test_red_queue_hard_limit_byte() { + let config = RedQueueConfig { + byte_limit: Some(150), + min_th: 100000, // avoid red drop + max_th: 200000, + ..Default::default() + }; + let mut queue: RedQueue = RedQueue::new(config); + + queue.enqueue(create_packet(100)); // l3 length 86. + assert_eq!(queue.length(), 1); + + // This one should be dropped due to byte limit (86 + 86 > 150) + queue.enqueue(create_packet(100)); + assert_eq!(queue.length(), 1); + } + + #[test_log::test] + fn test_red_queue_max_th_drop() { + let config = RedQueueConfig { + min_th: 100, + max_th: 200, + w_q: 1.0, // max weight, avg matches instantly + ..Default::default() + }; + let mut queue: RedQueue = RedQueue::new(config); + + // First packet + queue.enqueue(create_packet(100)); + + // Second packet + queue.enqueue(create_packet(300)); + + // At this point, queue length is 2, now_bytes is high enough. + // The next enqueue should see average_queue_length > max_th and drop the packet. + let before_len = queue.length(); + queue.enqueue(create_packet(100)); + assert_eq!( + queue.length(), + before_len, + "Packet should be dropped by RED max_th" + ); + } + + #[test_log::test] + fn test_red_queue_min_th_no_drop() { + let config = RedQueueConfig { + min_th: 1000, + max_th: 2000, + w_q: 1.0, // Instantly reach exact byte size + ..Default::default() + }; + let mut queue: RedQueue = RedQueue::new(config); + + // First packet: queue empty, avg remains 0. + queue.enqueue(create_packet(514)); // L3 size = 514 - 14 (Ethernet header) = 500 + assert_eq!(queue.length(), 1); + + // Second packet: queue has 500 bytes. w_q=1.0 makes avg = 500. + // 500 < min_th(1000), so it should not drop. + queue.enqueue(create_packet(414)); // L3 size = 400 + assert_eq!(queue.length(), 2); + + // Check internal state: count_packet is -1 when avg < min_th + assert_eq!(queue.count_packet, -1); + } + + #[test_log::test] + fn test_red_queue_probabilistic_drop() { + let config = RedQueueConfig { + min_th: 100, + max_th: 300, + max_p: 0.5, + w_q: 1.0, + ..Default::default() + }; + let mut queue: RedQueue = RedQueue::new(config); + + // First packet: queue empty, avg = 0. L3 size = 200. + queue.enqueue(create_packet(214)); + assert_eq!(queue.length(), 1); + + let mut drop_count = 0; + let total_packets = 1000; + + for _ in 0..total_packets { + // enqueue packets with L3 size 0 (total size 14). + // now_bytes stays at 200. w_q=1.0 makes avg exactly 200. + // 100 (min_th) <= avg(200) < 300 (max_th), entering probabilistic drop zone. + let before = queue.length(); + queue.enqueue(create_packet(14)); + if queue.length() == before { + drop_count += 1; + } + } + + // It should drop some packets, but not all of them + assert!( + drop_count > 0, + "Should have dropped some packets probabilistically" + ); + assert!(drop_count < total_packets, "Should not drop all packets"); + } +}