Skip to content

Commit

Permalink
rx read hangs because of redundant/excessive try_recv() (hyperium#219)
Browse files Browse the repository at this point in the history
If self.remaining_data is 0, recv_data() calls poll_next() which
will do a try_recv() and bring in and decode the frame. And right
after that we end up doing poll_data() which will again do a try_recv
and this time it can end up hanging if the other end has not sent
any more data. poll_data clearly checks if self.remaining_data is
NON-ZERO at the top of that API, so before calling try_recv() it
KNOWS that there is data to be read. So even if try_recv() says
pending, it can/should go ahead and pull in the existing data.
  • Loading branch information
gopakumarce authored Dec 15, 2023
1 parent 5c16195 commit a530808
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
7 changes: 5 additions & 2 deletions h3/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::task::{Context, Poll};

use bytes::Buf;

use futures_util::ready;
use tracing::trace;

use crate::stream::{BufRecvStream, WriteBuf};
Expand Down Expand Up @@ -96,7 +95,11 @@ where
return Poll::Ready(Ok(None));
};

let end = ready!(self.try_recv(cx))?;
let end = match self.try_recv(cx) {
Poll::Ready(Ok(end)) => end,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => false,
};
let data = self.stream.buf_mut().take_chunk(self.remaining_data);

match (data, end) {
Expand Down
37 changes: 37 additions & 0 deletions h3/src/tests/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,43 @@ async fn server_drop_close() {
tokio::join!(server_fut, client_fut);
}

// In this test the client calls send_data() without doing a finish(),
// i.e client keeps the body stream open. And cient expects server to
// read_data() and send a response
#[tokio::test]
async fn server_send_data_without_finish() {
let mut pair = Pair::default();
let mut server = pair.server();

let client_fut = async {
let (_driver, mut send_request) = client::new(pair.client().await).await.unwrap();

let mut req = send_request
.send_request(Request::get("http://no.way").body(()).unwrap())
.await
.unwrap();
let data = vec![0; 100];
let _ = req
.send_data(bytes::Bytes::copy_from_slice(&data))
.await
.unwrap();
let _ = req.recv_response().await.unwrap();
};

let server_fut = async {
let conn = server.next().await;
let mut incoming = server::Connection::new(conn).await.unwrap();
let (_, mut stream) = incoming.accept().await.unwrap().unwrap();
let mut data = stream.recv_data().await.unwrap().unwrap();
let data = data.copy_to_bytes(data.remaining());
assert_eq!(data.len(), 100);
response(stream).await;
server.endpoint.wait_idle().await;
};

tokio::join!(server_fut, client_fut);
}

#[tokio::test]
async fn client_close_only_on_last_sender_drop() {
let mut pair = Pair::default();
Expand Down

0 comments on commit a530808

Please sign in to comment.