From a530808c1db5a47f1e8c9180b4249a0bee672015 Mon Sep 17 00:00:00 2001 From: gopakumarce Date: Fri, 15 Dec 2023 11:03:08 -0500 Subject: [PATCH] rx read hangs because of redundant/excessive try_recv() (#219) If self.remaining_data is 0, recv_data() calls poll_next() which will do a try_recv() and bring in and decode the frame. And right after that we end up doing poll_data() which will again do a try_recv and this time it can end up hanging if the other end has not sent any more data. poll_data clearly checks if self.remaining_data is NON-ZERO at the top of that API, so before calling try_recv() it KNOWS that there is data to be read. So even if try_recv() says pending, it can/should go ahead and pull in the existing data. --- h3/src/frame.rs | 7 +++++-- h3/src/tests/connection.rs | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/h3/src/frame.rs b/h3/src/frame.rs index 3c8a58dc..a166fa1b 100644 --- a/h3/src/frame.rs +++ b/h3/src/frame.rs @@ -2,7 +2,6 @@ use std::task::{Context, Poll}; use bytes::Buf; -use futures_util::ready; use tracing::trace; use crate::stream::{BufRecvStream, WriteBuf}; @@ -96,7 +95,11 @@ where return Poll::Ready(Ok(None)); }; - let end = ready!(self.try_recv(cx))?; + let end = match self.try_recv(cx) { + Poll::Ready(Ok(end)) => end, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + Poll::Pending => false, + }; let data = self.stream.buf_mut().take_chunk(self.remaining_data); match (data, end) { diff --git a/h3/src/tests/connection.rs b/h3/src/tests/connection.rs index 458db214..186a7f12 100644 --- a/h3/src/tests/connection.rs +++ b/h3/src/tests/connection.rs @@ -93,6 +93,43 @@ async fn server_drop_close() { tokio::join!(server_fut, client_fut); } +// In this test the client calls send_data() without doing a finish(), +// i.e client keeps the body stream open. And cient expects server to +// read_data() and send a response +#[tokio::test] +async fn server_send_data_without_finish() { + let mut pair = Pair::default(); + let mut server = pair.server(); + + let client_fut = async { + let (_driver, mut send_request) = client::new(pair.client().await).await.unwrap(); + + let mut req = send_request + .send_request(Request::get("http://no.way").body(()).unwrap()) + .await + .unwrap(); + let data = vec![0; 100]; + let _ = req + .send_data(bytes::Bytes::copy_from_slice(&data)) + .await + .unwrap(); + let _ = req.recv_response().await.unwrap(); + }; + + let server_fut = async { + let conn = server.next().await; + let mut incoming = server::Connection::new(conn).await.unwrap(); + let (_, mut stream) = incoming.accept().await.unwrap().unwrap(); + let mut data = stream.recv_data().await.unwrap().unwrap(); + let data = data.copy_to_bytes(data.remaining()); + assert_eq!(data.len(), 100); + response(stream).await; + server.endpoint.wait_idle().await; + }; + + tokio::join!(server_fut, client_fut); +} + #[tokio::test] async fn client_close_only_on_last_sender_drop() { let mut pair = Pair::default();