Skip to content

Commit

Permalink
Implement RACK
Browse files Browse the repository at this point in the history
This is a modified version of [RACK] for QUIC.

* initial timeout at `9/8 RTT` instead of `5/4 RTT` to keep the behavior
  the same when no out-of-order packets arrive
* reorder_window_mult adds fractions of `1/8 RTT` instead of `1/4 RTT` for no
  particular reason (except that it make the code more consistent to the
  initial timeout of `9/8 RTT`
* out of order packets, that weren't causing spurious retransmits still reset
  the `reorder_window_persist` to 16. TCP doesn't have the necessary
  information to do this
* `reorder_window_mult` is set high enough to prevent a spurious retransmit
  next time instead of just increasing by one. Can be done, because we have
  more RTT estimates for packets, that would have been spuriously
  retransmitted in TCP.

[RACK]: https://datatracker.ietf.org/doc/html/rfc8985
  • Loading branch information
mb committed Nov 14, 2023
1 parent 2582acd commit 453147e
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 14 deletions.
12 changes: 10 additions & 2 deletions neqo-transport/src/cc/classic_cc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
}

// Multi-packet version of OnPacketAckedCC
fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool {
// Check whether we are app limited before acked packets are removed
// from bytes_in_flight.
let is_app_limited = self.app_limited();
Expand All @@ -163,6 +168,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
MAX_DATAGRAM_SIZE * PACING_BURST_SIZE,
);

let mut exiting_recovery = false;
let mut new_acked = 0;
for pkt in acked_pkts {
qinfo!(
Expand All @@ -187,6 +193,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {

if self.state.in_recovery() {
self.set_state(State::CongestionAvoidance);
exiting_recovery = true;
qlog::metrics_updated(&mut self.qlog, &[QlogMetric::InRecovery(false)]);
}

Expand All @@ -196,7 +203,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
if is_app_limited {
self.cc_algorithm.on_app_limited();
qinfo!("on_packets_acked this={:p}, limited=1, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
return;
return exiting_recovery;
}

// Slow start, up to the slow start threshold.
Expand Down Expand Up @@ -247,6 +254,7 @@ impl<T: WindowAdjustment> CongestionControl for ClassicCongestionControl<T> {
],
);
qinfo!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked);
exiting_recovery
}

/// Update congestion controller state based on lost packets.
Expand Down
7 changes: 6 additions & 1 deletion neqo-transport/src/cc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ pub trait CongestionControl: Display + Debug {
#[must_use]
fn cwnd_avail(&self) -> usize;

fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant);
fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool;

/// Returns true if the congestion window was reduced.
fn on_packets_lost(
Expand Down
4 changes: 2 additions & 2 deletions neqo-transport/src/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,10 @@ impl Path {
}

/// Record packets as acknowledged with the sender.
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) {
pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], now: Instant) -> bool {
debug_assert!(self.is_primary());
self.sender
.on_packets_acked(acked_pkts, self.rtt.minimum(), now);
.on_packets_acked(acked_pkts, self.rtt.minimum(), now)
}

/// Record packets as lost with the sender.
Expand Down
87 changes: 80 additions & 7 deletions neqo-transport/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ pub(crate) struct LossRecoverySpace {
/// This is `None` if there were no out-of-order packets detected.
/// When set to `Some(T)`, time-based loss detection should be enabled.
first_ooo_time: Option<Instant>,
/// If no reordering has been observed, TODO: just say reo_wnd_mult != 0
reordering_seen: bool,
/// the RTO is RTT * (reo_wnd_mult + 9) / 8
///
/// this is basically the index of the first non-zero entry of `reo_wnd_persist`
reorder_window_mult: u32,
reorder_window_persist: u32,
}

impl LossRecoverySpace {
Expand All @@ -197,6 +204,9 @@ impl LossRecoverySpace {
in_flight_outstanding: 0,
sent_packets: BTreeMap::default(),
first_ooo_time: None,
reorder_window_mult: 0,
reorder_window_persist: 0,
reordering_seen: false,
}
}

Expand Down Expand Up @@ -384,18 +394,20 @@ impl LossRecoverySpace {
pub fn detect_lost_packets(
&mut self,
now: Instant,
loss_delay: Duration,
rtt_estimate: Duration,
cleanup_delay: Duration,
lost_packets: &mut Vec<SentPacket>,
) {
// Housekeeping.
self.remove_old_lost(now, cleanup_delay);

let loss_delay = rtt_estimate * (self.reorder_window_mult + 9) / 8;
qtrace!(
"detect lost {}: now={:?} delay={:?}",
"detect lost {}: now={:?} delay={:?}, multiplier={}",
self.space,
now,
loss_delay,
self.reorder_window_mult
);
self.first_ooo_time = None;

Expand All @@ -418,7 +430,7 @@ impl LossRecoverySpace {
packet.time_sent,
loss_delay
);
} else if largest_acked >= Some(*pn + PACKET_THRESHOLD) {
} else if !self.reordering_seen && largest_acked >= Some(*pn + PACKET_THRESHOLD) {
qtrace!(
"lost={}, is >= {} from largest acked {:?}",
pn,
Expand All @@ -438,6 +450,62 @@ impl LossRecoverySpace {

lost_packets.extend(lost_pns.iter().map(|pn| self.sent_packets[pn].clone()));
}

fn loss_delay(rtt_estimate: Duration) ->

pub fn detect_reordered_packets(
&mut self,
now: Instant,
acked_pkts: &[SentPacket],
rtt_estimate: Duration,
) {
// detect packet reordering
let mut max_rtt = Duration::default();
if let Some(largest_ack) = self.largest_acked {
for pkt in acked_pkts
.iter()
.filter(|pkt| pkt.cc_in_flight() && pkt.pn < largest_ack)
{
qinfo!("detect_reordered_packets largest_ack={}, pn={}", largest_ack, pkt.pn);
// reordering event
self.reordering_seen = true;
max_rtt = max(max_rtt, now.duration_since(pkt.time_sent));
}
}
// update reo_wnd
if max_rtt > rtt_estimate && !rtt_estimate.is_zero() {
// calculate reo_wnd necessary to accept the reordering event
// inverse of
// lost_delay = rtt_estimate * (self.reo_wnd_mult + 9) / 8;
// <=> self.reo_wnd_mult = (lost_delay / rtt_estimate) * 8 - 9
let multiplier = min(
(max_rtt.as_micros() * 8 / rtt_estimate.as_micros()) - 9 + 1,
8,
);
let multiplier = u32::try_from(multiplier).unwrap();
qinfo!(
"detect_reordered_packets max_rtt={}, rtt_estimate={} old_barrier={}, new_barrier={}",
max_rtt.as_micros(),
rtt_estimate.as_micros(),
(rtt_estimate * (self.reorder_window_mult + 9) / 8).as_micros(),
(rtt_estimate * (multiplier + 9) / 8).as_micros()
);
self.reorder_window_mult = max(self.reorder_window_mult, multiplier);
}
}

pub fn on_exiting_recovery(&mut self) {
if self.reorder_window_persist != 0 {
self.reorder_window_persist -= 1;
if self.reorder_window_persist == 0 {
self.reorder_window_mult = 0;
}
}
qinfo!(
"on_exiting_recovery reorder_window_persist={}, reorder_window_mult={}",
self.reorder_window_persist, self.reorder_window_mult
);
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -680,6 +748,9 @@ impl LossRecovery {
return (Vec::new(), Vec::new());
}

let rtt_estimate = primary_path.borrow().rtt().estimated_upper();
space.detect_reordered_packets(now, &acked_packets, rtt_estimate);

// Track largest PN acked per space
let prev_largest_acked = space.largest_acked_sent_time;
if Some(largest_acked) > space.largest_acked {
Expand All @@ -704,12 +775,11 @@ impl LossRecovery {
// We need to ensure that we have sent any PTO probes before they are removed
// as we rely on the count of in-flight packets to determine whether to send
// another probe. Removing them too soon would result in not sending on PTO.
let loss_delay = primary_path.borrow().rtt().loss_delay();
let cleanup_delay = self.pto_period(primary_path.borrow().rtt(), pn_space);
let mut lost = Vec::new();
self.spaces.get_mut(pn_space).unwrap().detect_lost_packets(
now,
loss_delay,
rtt_estimate,
cleanup_delay,
&mut lost,
);
Expand All @@ -725,9 +795,12 @@ impl LossRecovery {
// This must happen after on_packets_lost. If in recovery, this could
// take us out, and then lost packets will start a new recovery period
// when it shouldn't.
primary_path
if primary_path
.borrow_mut()
.on_packets_acked(&acked_packets, now);
.on_packets_acked(&acked_packets, now)
{
self.spaces.get_mut(pn_space).unwrap().on_exiting_recovery();
}

self.pto_state = None;

Expand Down
10 changes: 10 additions & 0 deletions neqo-transport/src/rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,16 @@ impl RttEstimate {
max(rtt * 9 / 8, GRANULARITY)
}

/// Frin RFC9002 Section 6.1.2 Time Treshhold
/// Using max(smoothed_rtt, latest_rtt) protects from the two following cases:
// * the latest RTT sample is lower than the smoothed RTT, perhaps due to reordering where the
// acknowledgment encountered a shorter path;
// * the latest RTT sample is higher than the smoothed RTT, perhaps due to a sustained
// increase in the actual RTT, but the smoothed RTT has not yet caught up.
pub fn estimated_upper(&self) -> Duration {
max(self.latest_rtt, self.smoothed_rtt)
}

pub fn first_sample_time(&self) -> Option<Instant> {
self.first_sample_time
}
Expand Down
9 changes: 7 additions & 2 deletions neqo-transport/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ impl PacketSender {
self.cc.cwnd_avail()
}

pub fn on_packets_acked(&mut self, acked_pkts: &[SentPacket], min_rtt: Duration, now: Instant) {
self.cc.on_packets_acked(acked_pkts, min_rtt, now);
pub fn on_packets_acked(
&mut self,
acked_pkts: &[SentPacket],
min_rtt: Duration,
now: Instant,
) -> bool {
self.cc.on_packets_acked(acked_pkts, min_rtt, now)
}

/// Called when packets are lost. Returns true if the congestion window was reduced.
Expand Down

0 comments on commit 453147e

Please sign in to comment.