add RED, ARED and PIE#197
Conversation
un-lock-able
left a comment
There was a problem hiding this comment.
Please also remember to fix warnings from cargo clippy (in github actions) and run cargo fmt after modifications.
There was a problem hiding this comment.
Pull request overview
Adds three Active Queue Management (AQM) queue implementations (RED, Adaptive RED, and PIE) to rattan-core’s bandwidth cell queue module, enabling more realistic congestion signaling behavior than the existing tail/head drop queues.
Changes:
- Introduces new queue implementations:
RedQueue,AdaptiveRedQueue, andPieQueue, each with its own config struct and unit tests. - Exposes the new queues from
rattan-core/src/cells/bandwidth/queue/mod.rsso they can be used by the rest of the crate. - Adds algorithm-specific state tracking (avg queue length / drop probability / burst allowance / rate estimation) and hard-limit enforcement (packet/byte limits).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| rattan-core/src/cells/bandwidth/queue/mod.rs | Registers and re-exports the new AQM queue modules. |
| rattan-core/src/cells/bandwidth/queue/red.rs | Adds RED queue implementation + tests. |
| rattan-core/src/cells/bandwidth/queue/ared.rs | Adds Adaptive RED (ARED) implementation + tests. |
| rattan-core/src/cells/bandwidth/queue/pie.rs | Adds PIE implementation + tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| fn retain<F>(&mut self, mut f: F) | ||
| where | ||
| F: FnMut(&P) -> bool, | ||
| { | ||
| self.queue.retain(|packet| f(packet)); | ||
| } |
| fn retain<F>(&mut self, mut f: F) | ||
| where | ||
| F: FnMut(&P) -> bool, | ||
| { | ||
| self.queue.retain(|packet| f(packet)); | ||
| } |
| fn retain<F>(&mut self, mut f: F) | ||
| where | ||
| F: FnMut(&P) -> bool, | ||
| { | ||
| self.queue.retain(|packet| f(packet)); | ||
| } |
| std::thread::sleep(Duration::from_millis(15)); // Wait to exceed t_update | ||
|
|
||
| // This enqueue will trigger update_drop_probability() | ||
| queue.enqueue(create_packet(14)); |
| fn update_max_p(&mut self) { | ||
| let target_min = | ||
| self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; | ||
| let target_max = | ||
| self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; | ||
| if self.average_queue_length > target_max && self.config.max_p <= 0.5 { | ||
| self.config.max_p += (self.config.max_p / 4.0).min(0.01); | ||
| } else if self.average_queue_length < target_min && self.config.max_p >= 0.01 { | ||
| self.config.max_p *= 0.9; | ||
| } | ||
| } |
There was a problem hiding this comment.
The ARED paper ask us to constrain
Your implementation pushes 0.4999 to about 0.6249. and 0.01 to 0.009. Actually let's implement that in a simpler way.
| fn update_max_p(&mut self) { | |
| let target_min = | |
| self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; | |
| let target_max = | |
| self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; | |
| if self.average_queue_length > target_max && self.config.max_p <= 0.5 { | |
| self.config.max_p += (self.config.max_p / 4.0).min(0.01); | |
| } else if self.average_queue_length < target_min && self.config.max_p >= 0.01 { | |
| self.config.max_p *= 0.9; | |
| } | |
| } | |
| fn update_max_p(&mut self) { | |
| let target_min = | |
| self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64; | |
| let target_max = | |
| self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64; | |
| if self.average_queue_length > target_max { | |
| self.config.max_p += (self.config.max_p / 4.0).min(0.01); | |
| } else if self.average_queue_length < target_min { | |
| self.config.max_p *= 0.9; | |
| } | |
| // Clamp to [0.01, 0.5] | |
| self.config.max_p = self.config.max_p.clamp(0.01, 0.5); | |
| } |
| p_b / (1.0 - self.count_packet as f64 * p_b) | ||
| }; | ||
|
|
||
| let rand_val = random_range(0.0..1.0); |
There was a problem hiding this comment.
This adds non-deterministic behavior. You can refer to how we used a seeded random generator to make the queue's behavior deterministic.
| assert!( | ||
| drop_count > 0, | ||
| "Should have dropped some packets probabilistically" | ||
| ); |
There was a problem hiding this comment.
It's not good enough to just check "drop some packets, but not all of them".
You could try to calculate actually how many packets (an exact number or a range) is expected to be dropped. Thus, when further modifications on the queue's implementation was made, we can be more confident that, there's no unexpected behavior change by the automated test.
Lethe10137
left a comment
There was a problem hiding this comment.
Please refer to #155 about the "logical timestamp". Conceptually these time-based queue should try to avoid calling Instant::now(), but refer to the timestamp of the packets, which is when the packet "should" be here.
| return false; | ||
| } | ||
|
|
||
| let rand_val = random_range(0.0..1.0); |
There was a problem hiding this comment.
Also, try to make this deterministic with seeded Random generator.
|
|
||
| fn update_avg_drate(&mut self, pkt_size: usize) { | ||
| let now = Instant::now(); | ||
| let dq_threshold = 16384; // 16 KB |
There was a problem hiding this comment.
16 KiB
| let dq_threshold = 16384; // 16 KB | |
| let dq_threshold = 16384; // 16 KiB |
| P: Packet, | ||
| { | ||
| fn update_drop_probability(&mut self) { | ||
| let now = Instant::now(); |
There was a problem hiding this comment.
The now should be a parameter passed in, which is the logical timestamp of the packet that triggered this.
| let interval_update = Instant::now().saturating_duration_since(self.start_update); | ||
| let t_update = Duration::from_millis(15); | ||
| if interval_update >= t_update { | ||
| self.update_drop_probability(); |
There was a problem hiding this comment.
Pass packet.get_timestamp() as an argument. The queue should use this as the "logical time", rather than Instant::now().
| std::thread::sleep(Duration::from_millis(15)); // Wait to exceed t_update | ||
|
|
||
| // This enqueue will trigger update_drop_probability() | ||
| queue.enqueue(create_packet(14)); |
There was a problem hiding this comment.
If the queue were working based on the logical timestamp (the timestamp of the packets), you could just modify the timestamp of the packets.
| queue.enqueue(create_packet(14)); | ||
|
|
||
| assert!( | ||
| queue.p > 0.0, |
There was a problem hiding this comment.
What probability should it be updated to? Please make this more strict.
| } | ||
|
|
||
| if let Some(idle_start) = self.idle_start { | ||
| let now = Instant::now(); |
There was a problem hiding this comment.
Also, try to use the logical timestamp instead.
| fn enqueue(&mut self, packet: P) { | ||
| // Simulate time-driven with event-driven approach | ||
| let interval_update = Instant::now().saturating_duration_since(self.start_update); | ||
| let t_update = Duration::from_millis(15); |
There was a problem hiding this comment.
Please this update interval configurable and default to be 15ms, according to RFC 8033:
The update interval, T_UPDATE, is defaulted to be 15 milliseconds.
It MAY be reduced on high-speed links in order to provide smoother
response. The target latency value, QDELAY_REF, SHOULD be set to 15
milliseconds.
| } | ||
|
|
||
| let packet_size = packet.l3_length() + self.get_extra_length(); | ||
| let pass_hard_limit = self |
There was a problem hiding this comment.
This pass_hard_limit actually means whether current queue length does NOT pass the packet_limit or byte_limit. Please consider to use a better variable name.
| } | ||
|
|
||
| let packet_size = packet.l3_length() + self.get_extra_length(); | ||
| let pass_hard_limit = self |
There was a problem hiding this comment.
Same problem as pass_hard_limit in ARED implementation.
| where | ||
| P: Packet, | ||
| { | ||
| fn update_drop_probability(&mut self) { |
There was a problem hiding this comment.
According to RFC 8033 and the PIE implementation in Linux kernel (see https://github.com/torvalds/linux/blob/master/net/sched/sch_pie.c#L425), this update happens every 15ms periodically. But in your implementation, it seems to happen only when there is a packet trying to enqueue or dequeue. This may lead to inconsistency between your implementation and the standard implementation.
Consider to add a real timer to trigger this update event, or use some circular update logic in enqueue and dequeue just like this:
let mut interval_update = Instant::now().saturating_duration_since(self.start_update);
let t_update = Duration::from_millis(15);
while interval_update >= t_update {
self.update_drop_probability(t_update.as_secs_f64() * 1000.0);
interval_update = Instant::now().saturating_duration_since(self.start_update);
}| let epsilon = 0.125; | ||
| self.avg_drate = (1.0 - epsilon) * self.avg_drate + epsilon * dq_rate; | ||
| } | ||
| self.start_measurement = Some(now); |
There was a problem hiding this comment.
It seems that the measurement should be ended here according to the pseudocode in RFC 8033 appendix B:
if ( PIE->dq_count_ >= DQ_THRESHOLD) {
...
PIE->in_measurement_ = FALSE;
}
Please check your implementation.
| } else if self.p < 0.1 { | ||
| p_increment /= 2.0; | ||
| } | ||
| self.p += p_increment; |
There was a problem hiding this comment.
I notice that there is a capping logic in the pseudocode in RFC 8033 appendix B:
if (PIE->drop_prob_ >= 0.1 && p > 0.02) {
p = 0.02;
}
But you don't have this in your code. Relative content is in RFC 8033 Section 5.5: Cap Drop Adjustment. Please check if you should add this adjustment to your implementation.
|
|
||
| // RFC 8033 Section 4.1: Bypass random drop logic to be work conserving | ||
| let bypass_drop = | ||
| (self.old_del < self.config.ref_del / 2.0 && self.p < 0.2) || self.queue.len() <= 2; |
There was a problem hiding this comment.
In the pseudocode, the bypass_drop is relative to the byte length of the queue, not the packet length. Please check your implementation.
if ( (PIE->qdelay_old_ < QDELAY_REF/2 && PIE->drop_prob_ < 0.2)
|| (queue_.byte_length() <= 2 * MEAN_PKTSIZE) ) {
return ENQUE;
}
| self.update_avg(); | ||
|
|
||
| let packet_size = packet.l3_length() + self.get_extra_length(); | ||
| let pass_hard_limit = self |
There was a problem hiding this comment.
Same problem as pass_hard_limit in ARED implementation.
| false | ||
| } | ||
| } else if avg >= max_th { | ||
| self.count_packet = 0; |
There was a problem hiding this comment.
Same problem as your ARED implementation.
| false | ||
| } | ||
| } else if avg >= max_th { | ||
| self.count_packet = 0; |
There was a problem hiding this comment.
According to the code in Linux kernel (see https://github.com/torvalds/linux/blob/master/include/net/red.h#L435), the count_packet should be set to -1 when avg >= max_th.
| if let Some(idle_start) = self.idle_start { | ||
| let now = Instant::now(); | ||
| let idle_duration = now.saturating_duration_since(idle_start); | ||
| let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us |
There was a problem hiding this comment.
Why it's hard-coded 100Mbps here? The bandwidth can be customized to any value.
| if let Some(idle_start) = self.idle_start { | ||
| let now = Instant::now(); | ||
| let idle_duration = now.saturating_duration_since(idle_start); | ||
| let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us |
There was a problem hiding this comment.
Same problem as your ARED implementation.
No description provided.