Skip to content

Commit

Permalink
refactor(client): use process_output and process_multiple_input
Browse files Browse the repository at this point in the history
`neqo_transport::Connection` offers 4 process methods:

- `process`
- `process_output`
- `process_input`
- `process_multiple_input`

Where `process` is a wrapper around `process_input` and `process_output` calling
both in sequence.

https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L1099-L1107

Where `process_input` delegates to `process_multiple_input`.

https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L979-L1000

Previously `neqo-client` would use `process` only. Thus continuously
interleaving output and input. Say `neqo-client` would have multiple datagrams
buffered through a GRO read, it could potentially have to do a write in between
each `process` calls, as each call to `process` with an input datagram might
return an output datagram to be written.

With this commit `neqo-client` uses `process_output` and `process_multiple_input`
directly, thus reducing interleaving on batch reads (GRO and in the future
recvmmsg) and in the future batch writes (GSO and sendmmsg).

By using `process_multiple_input` instead of `process` or `process_input`,
auxiliarry logic, like `self.cleanup_closed_streams` only has to run per input
datagram batch, and not for each input datagram.

Extracted from mozilla#1741.
  • Loading branch information
mxinden committed Apr 7, 2024
1 parent 5dfe106 commit e32cf8b
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 25 deletions.
11 changes: 9 additions & 2 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,15 @@ pub(crate) fn create_client(
}

impl super::Client for Connection {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
self.process(dgram, now)
fn process_output(&mut self, now: Instant) -> Output {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
self.process_multiple_input(dgrams, now);
}

fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
Expand Down
11 changes: 9 additions & 2 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,15 @@ impl super::Client for Http3Client {
matches!(self.state(), Http3State::Closed(..))
}

fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
self.process(dgram, now)
fn process_output(&mut self, now: Instant) -> Output {
self.process_output(now)
}

fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
{
self.process_multiple_input(dgrams, now);
}

fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
Expand Down
36 changes: 22 additions & 14 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,10 @@ trait Handler {

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_output(&mut self, now: Instant) -> Output;
fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>;
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand Down Expand Up @@ -404,7 +407,7 @@ impl<'a, H: Handler> Runner<'a, H> {
}
}

self.process(None).await?;
self.process_output().await?;

if self.client.is_closed() {
if self.args.stats {
Expand All @@ -414,26 +417,17 @@ impl<'a, H: Handler> Runner<'a, H> {
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Socket => self.process_multiple_input()?,
Ready::Timeout => {
self.timeout = None;
}
}
}
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
async fn process_output(&mut self) -> Result<(), io::Error> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
match self.client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
Expand All @@ -452,6 +446,20 @@ impl<'a, H: Handler> Runner<'a, H> {

Ok(())
}

fn process_multiple_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.client
.process_multiple_input(dgrams.iter(), Instant::now());
self.handler.maybe_key_update(&mut self.client)?;
}

Ok(())
}
}

fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res<NeqoQlog> {
Expand Down
7 changes: 3 additions & 4 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -880,11 +880,10 @@ impl Http3Client {
pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I::IntoIter: ExactSizeIterator,
{
let dgrams = dgrams.into_iter();
qtrace!([self], "Process multiple datagrams, len={}", dgrams.len());
if dgrams.len() == 0 {
let mut dgrams = dgrams.into_iter().peekable();
qtrace!([self], "Process multiple datagrams");
if dgrams.peek().is_none() {
return;
}
self.conn.process_multiple_input(dgrams, now);
Expand Down
5 changes: 2 additions & 3 deletions neqo-transport/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,10 +985,9 @@ impl Connection {
pub fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant)
where
I: IntoIterator<Item = &'a Datagram>,
I::IntoIter: ExactSizeIterator,
{
let dgrams = dgrams.into_iter();
if dgrams.len() == 0 {
let mut dgrams = dgrams.into_iter().peekable();
if dgrams.peek().is_none() {
return;
}

Expand Down

0 comments on commit e32cf8b

Please sign in to comment.