Skip to content

Commit

Permalink
refactor(client): use process_output and process_input
Browse files Browse the repository at this point in the history
`neqo_transport::Connection` offers 3 process methods:

- `process`
- `process_output`
- `process_input`

Where `process` is a wrapper around `process_input` and `process_output` calling
both in sequence.

https://github.com/mozilla/neqo/blob/5dfe106669ccb695187511305c21b8e8a8775e91/neqo-transport/src/connection/mod.rs#L1099-L1107

Previously `neqo-client` would use `process` only. Thus continuously
interleaving output and input. Say `neqo-client` would have multiple datagrams
buffered through a GRO read, it could potentially have to do a write in between
each `process` calls, as each call to `process` with an input datagram might
return an output datagram to be written.

With this commit `neqo-client` uses `process_output` and `process_input`
directly, thus reducing interleaving on batch reads (GRO and in the future
recvmmsg) and in the future batch writes (GSO and sendmmsg).

Extracted from mozilla#1741.
  • Loading branch information
mxinden committed Apr 7, 2024
1 parent 5dfe106 commit b859582
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 18 deletions.
8 changes: 6 additions & 2 deletions neqo-bin/src/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ pub(crate) fn create_client(
}

impl super::Client for Connection {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
self.process(dgram, now)
fn process_output(&mut self, now: Instant) -> Output {
self.process_output(now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
Expand Down
8 changes: 6 additions & 2 deletions neqo-bin/src/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ impl super::Client for Http3Client {
matches!(self.state(), Http3State::Closed(..))
}

fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output {
self.process(dgram, now)
fn process_output(&mut self, now: Instant) -> Output {
self.process_output(now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
Expand Down
35 changes: 21 additions & 14 deletions neqo-bin/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ trait Handler {

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_output(&mut self, now: Instant) -> Output;
fn process_input(&mut self, dgram: &Datagram, now: Instant);
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand Down Expand Up @@ -404,7 +405,7 @@ impl<'a, H: Handler> Runner<'a, H> {
}
}

self.process(None).await?;
self.process_output().await?;

if self.client.is_closed() {
if self.args.stats {
Expand All @@ -414,26 +415,17 @@ impl<'a, H: Handler> Runner<'a, H> {
}

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Socket => self.process_input()?,
Ready::Timeout => {
self.timeout = None;
}
}
}
}

async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> {
async fn process_output(&mut self) -> Result<(), io::Error> {
loop {
match self.client.process(dgram.take(), Instant::now()) {
match self.client.process_output(Instant::now()) {
Output::Datagram(dgram) => {
self.socket.writable().await?;
self.socket.send(dgram)?;
Expand All @@ -452,6 +444,21 @@ impl<'a, H: Handler> Runner<'a, H> {

Ok(())
}

fn process_input(&mut self) -> Res<()> {
loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
for dgram in &dgrams {
self.client.process_input(dgram, Instant::now());
}
self.handler.maybe_key_update(&mut self.client)?;
}

Ok(())
}
}

fn qlog_new(args: &Args, hostname: &str, cid: &ConnectionId) -> Res<NeqoQlog> {
Expand Down

0 comments on commit b859582

Please sign in to comment.