Skip to content

add RED, ARED and PIE#197

Open
CepheusC wants to merge 12 commits into
stack-rs:mainfrom
CepheusC:redpie
Open

add RED, ARED and PIE#197
CepheusC wants to merge 12 commits into
stack-rs:mainfrom
CepheusC:redpie

Conversation

@CepheusC

@CepheusC CepheusC commented Jun 9, 2026

Copy link
Copy Markdown

No description provided.

@Centaurus99 Centaurus99 requested a review from EwecaBcD June 9, 2026 07:43

@un-lock-able un-lock-able left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also remember to fix warnings from cargo clippy (in github actions) and run cargo fmt after modifications.

Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs Outdated
Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs Outdated
Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs Outdated
Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs Outdated
Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs Outdated
Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs Outdated
Comment thread rattan-core/src/cells/bandwidth/queue/red.rs
@Centaurus99 Centaurus99 requested review from Lethe10137 and Copilot and removed request for EwecaBcD June 12, 2026 06:04

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds three Active Queue Management (AQM) queue implementations (RED, Adaptive RED, and PIE) to rattan-core’s bandwidth cell queue module, enabling more realistic congestion signaling behavior than the existing tail/head drop queues.

Changes:

  • Introduces new queue implementations: RedQueue, AdaptiveRedQueue, and PieQueue, each with its own config struct and unit tests.
  • Exposes the new queues from rattan-core/src/cells/bandwidth/queue/mod.rs so they can be used by the rest of the crate.
  • Adds algorithm-specific state tracking (avg queue length / drop probability / burst allowance / rate estimation) and hard-limit enforcement (packet/byte limits).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
rattan-core/src/cells/bandwidth/queue/mod.rs Registers and re-exports the new AQM queue modules.
rattan-core/src/cells/bandwidth/queue/red.rs Adds RED queue implementation + tests.
rattan-core/src/cells/bandwidth/queue/ared.rs Adds Adaptive RED (ARED) implementation + tests.
rattan-core/src/cells/bandwidth/queue/pie.rs Adds PIE implementation + tests.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +262 to +267
fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&P) -> bool,
{
self.queue.retain(|packet| f(packet));
}
Comment on lines +277 to +282
fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&P) -> bool,
{
self.queue.retain(|packet| f(packet));
}
Comment on lines +315 to +320
fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&P) -> bool,
{
self.queue.retain(|packet| f(packet));
}
Comment thread rattan-core/src/cells/bandwidth/queue/ared.rs
Comment on lines +469 to +472
std::thread::sleep(Duration::from_millis(15)); // Wait to exceed t_update

// This enqueue will trigger update_drop_probability()
queue.enqueue(create_packet(14));
@Centaurus99 Centaurus99 requested a review from EwecaBcD June 12, 2026 07:00
Comment on lines +172 to +182
fn update_max_p(&mut self) {
let target_min =
self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64;
let target_max =
self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64;
if self.average_queue_length > target_max && self.config.max_p <= 0.5 {
self.config.max_p += (self.config.max_p / 4.0).min(0.01);
} else if self.average_queue_length < target_min && self.config.max_p >= 0.01 {
self.config.max_p *= 0.9;
}
}

@Lethe10137 Lethe10137 Jun 12, 2026

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ARED paper ask us to constrain $max_p$ is constrained to remain within the range $[0.01, 0.5]$.

Your implementation pushes 0.4999 to about 0.6249. and 0.01 to 0.009. Actually let's implement that in a simpler way.

Suggested change
fn update_max_p(&mut self) {
let target_min =
self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64;
let target_max =
self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64;
if self.average_queue_length > target_max && self.config.max_p <= 0.5 {
self.config.max_p += (self.config.max_p / 4.0).min(0.01);
} else if self.average_queue_length < target_min && self.config.max_p >= 0.01 {
self.config.max_p *= 0.9;
}
}
fn update_max_p(&mut self) {
let target_min =
self.config.min_th as f64 + 0.4 * (self.config.max_th - self.config.min_th) as f64;
let target_max =
self.config.min_th as f64 + 0.6 * (self.config.max_th - self.config.min_th) as f64;
if self.average_queue_length > target_max {
self.config.max_p += (self.config.max_p / 4.0).min(0.01);
} else if self.average_queue_length < target_min {
self.config.max_p *= 0.9;
}
// Clamp to [0.01, 0.5]
self.config.max_p = self.config.max_p.clamp(0.01, 0.5);
}

p_b / (1.0 - self.count_packet as f64 * p_b)
};

let rand_val = random_range(0.0..1.0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds non-deterministic behavior. You can refer to how we used a seeded random generator to make the queue's behavior deterministic.

assert!(
drop_count > 0,
"Should have dropped some packets probabilistically"
);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not good enough to just check "drop some packets, but not all of them".

You could try to calculate actually how many packets (an exact number or a range) is expected to be dropped. Thus, when further modifications on the queue's implementation was made, we can be more confident that, there's no unexpected behavior change by the automated test.

@Lethe10137 Lethe10137 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please refer to #155 about the "logical timestamp". Conceptually these time-based queue should try to avoid calling Instant::now(), but refer to the timestamp of the packets, which is when the packet "should" be here.

return false;
}

let rand_val = random_range(0.0..1.0);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, try to make this deterministic with seeded Random generator.


fn update_avg_drate(&mut self, pkt_size: usize) {
let now = Instant::now();
let dq_threshold = 16384; // 16 KB

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

16 KiB

Suggested change
let dq_threshold = 16384; // 16 KB
let dq_threshold = 16384; // 16 KiB

P: Packet,
{
fn update_drop_probability(&mut self) {
let now = Instant::now();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The now should be a parameter passed in, which is the logical timestamp of the packet that triggered this.

let interval_update = Instant::now().saturating_duration_since(self.start_update);
let t_update = Duration::from_millis(15);
if interval_update >= t_update {
self.update_drop_probability();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass packet.get_timestamp() as an argument. The queue should use this as the "logical time", rather than Instant::now().

std::thread::sleep(Duration::from_millis(15)); // Wait to exceed t_update

// This enqueue will trigger update_drop_probability()
queue.enqueue(create_packet(14));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the queue were working based on the logical timestamp (the timestamp of the packets), you could just modify the timestamp of the packets.

queue.enqueue(create_packet(14));

assert!(
queue.p > 0.0,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What probability should it be updated to? Please make this more strict.

}

if let Some(idle_start) = self.idle_start {
let now = Instant::now();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, try to use the logical timestamp instead.

fn enqueue(&mut self, packet: P) {
// Simulate time-driven with event-driven approach
let interval_update = Instant::now().saturating_duration_since(self.start_update);
let t_update = Duration::from_millis(15);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please this update interval configurable and default to be 15ms, according to RFC 8033:

The update interval, T_UPDATE, is defaulted to be 15 milliseconds.
It MAY be reduced on high-speed links in order to provide smoother
response. The target latency value, QDELAY_REF, SHOULD be set to 15
milliseconds.

}

let packet_size = packet.l3_length() + self.get_extra_length();
let pass_hard_limit = self

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pass_hard_limit actually means whether current queue length does NOT pass the packet_limit or byte_limit. Please consider to use a better variable name.

}

let packet_size = packet.l3_length() + self.get_extra_length();
let pass_hard_limit = self

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem as pass_hard_limit in ARED implementation.

where
P: Packet,
{
fn update_drop_probability(&mut self) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to RFC 8033 and the PIE implementation in Linux kernel (see https://github.com/torvalds/linux/blob/master/net/sched/sch_pie.c#L425), this update happens every 15ms periodically. But in your implementation, it seems to happen only when there is a packet trying to enqueue or dequeue. This may lead to inconsistency between your implementation and the standard implementation.

Consider to add a real timer to trigger this update event, or use some circular update logic in enqueue and dequeue just like this:

let mut interval_update = Instant::now().saturating_duration_since(self.start_update);
let t_update = Duration::from_millis(15);

while interval_update >= t_update {
    self.update_drop_probability(t_update.as_secs_f64() * 1000.0);
    interval_update = Instant::now().saturating_duration_since(self.start_update);
}

let epsilon = 0.125;
self.avg_drate = (1.0 - epsilon) * self.avg_drate + epsilon * dq_rate;
}
self.start_measurement = Some(now);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the measurement should be ended here according to the pseudocode in RFC 8033 appendix B:

if ( PIE->dq_count_ >= DQ_THRESHOLD) {
    ...
    PIE->in_measurement_ = FALSE;
}

Please check your implementation.

} else if self.p < 0.1 {
p_increment /= 2.0;
}
self.p += p_increment;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice that there is a capping logic in the pseudocode in RFC 8033 appendix B:

if (PIE->drop_prob_ >= 0.1 && p > 0.02) {
    p = 0.02;
}

But you don't have this in your code. Relative content is in RFC 8033 Section 5.5: Cap Drop Adjustment. Please check if you should add this adjustment to your implementation.


// RFC 8033 Section 4.1: Bypass random drop logic to be work conserving
let bypass_drop =
(self.old_del < self.config.ref_del / 2.0 && self.p < 0.2) || self.queue.len() <= 2;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the pseudocode, the bypass_drop is relative to the byte length of the queue, not the packet length. Please check your implementation.

if ( (PIE->qdelay_old_ < QDELAY_REF/2 && PIE->drop_prob_ < 0.2)
     || (queue_.byte_length() <= 2 * MEAN_PKTSIZE) ) {
    return ENQUE;
}

self.update_avg();

let packet_size = packet.l3_length() + self.get_extra_length();
let pass_hard_limit = self

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem as pass_hard_limit in ARED implementation.

false
}
} else if avg >= max_th {
self.count_packet = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem as your ARED implementation.

false
}
} else if avg >= max_th {
self.count_packet = 0;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the code in Linux kernel (see https://github.com/torvalds/linux/blob/master/include/net/red.h#L435), the count_packet should be set to -1 when avg >= max_th.

if let Some(idle_start) = self.idle_start {
let now = Instant::now();
let idle_duration = now.saturating_duration_since(idle_start);
let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it's hard-coded 100Mbps here? The bandwidth can be customized to any value.

if let Some(idle_start) = self.idle_start {
let now = Instant::now();
let idle_duration = now.saturating_duration_since(idle_start);
let pkt_tx_time = 120.0; // 1500 bytes * 8 / 100Mbps = 120 us

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem as your ARED implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants