Description
Code to reproduce
use h2::client;
use http::{Method, Request};
use std::{
mem,
pin::Pin,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
net::TcpStream,
};
use tokio_native_tls::native_tls::TlsConnector;
type Result<T> = std::io::Result<T>;
struct DelayedShutdownStream<T> {
inflight_shutdown: bool,
inner: T,
}
impl<T> DelayedShutdownStream<T> {
fn new(inner: T) -> Self {
Self {
inflight_shutdown: false,
inner,
}
}
}
impl<T: AsyncRead + Unpin> AsyncRead for DelayedShutdownStream<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<Result<()>> {
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
}
}
impl<T: AsyncWrite + Unpin> AsyncWrite for DelayedShutdownStream<T> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
println!("polling flush");
if self.inflight_shutdown {
panic!("poll_flush called while shutdown in flight");
}
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
println!("polling shutdown");
if mem::replace(&mut self.inflight_shutdown, true) {
return Pin::new(&mut self.get_mut().inner).poll_shutdown(cx);
}
cx.waker().wake_by_ref();
Poll::Pending
}
}
#[tokio::main]
pub async fn main() -> Result<()> {
// _main_tcp().await
main_h2().await
}
pub async fn _main_tcp() -> Result<()> {
let stream = TcpStream::connect("1.1.1.1:443").await?;
println!("Connected");
let mut stream = DelayedShutdownStream::new(stream);
stream.flush().await?;
stream.shutdown().await?;
println!("Shutdown done");
Ok(())
}
pub async fn main_h2() -> Result<()> {
let (mut h2, conn_task) = {
// Establish TCP connection to the server.
let stream = TcpStream::connect("1.1.1.1:443").await?;
let builder = tokio_native_tls::TlsConnector::from(
TlsConnector::builder()
.request_alpns(&["h2"])
.build()
.unwrap(),
);
let stream = builder.connect("1.1.1.1", stream).await.unwrap();
let stream = DelayedShutdownStream::new(stream);
let (h2, connection) = client::handshake(stream).await.expect("handshake");
let conn_task = tokio::spawn(async move {
connection.await.unwrap();
});
let h2 = h2.ready().await.expect("h2 ready");
(h2, conn_task)
};
{
// Prepare the HTTP request to send to the server.
let request = Request::builder()
.method(Method::GET)
.uri("https://1.1.1.1/")
.body(())
.unwrap();
// Send the request. The second tuple item allows the caller
// to stream a request body.
let (response, _send) = h2.send_request(request, true).unwrap();
let (head, _) = response.await.expect("response").into_parts();
println!("Received response: {:?}", head);
}
drop(h2);
println!("Waiting for connection task to finish");
conn_task.await?;
println!("Connection task finished");
Ok(())
}
Actual result
polling flush
appears many times even after polling shutdown
. The unexpected flush leads to a panic.
polling flush
polling flush
Received response: Parts { status: 301, version: HTTP/2.0, headers: {"date": "Sun, 26 Jan 2025 13:19:55 GMT", "content-length": "0", "location": "https://one.one.one.one/", "report-to": "{\"endpoints\":[{\"url\":\"https:\/\/a.nel.cloudflare.com\/report\/v4?s=gqPSqV%2FCguv1iLTmtVVEr5DQ7uaGmTuHvQDZX6Zjly6Dp%2BaVknfUzue3V4HMlJAI4TRFitR7FcvELAnnCFtPMr6j7Tbc7CLStsKmE5dsBIS%2F0L3RM4zJWHo%3D\"}],\"group\":\"cf-nel\",\"max_age\":604800}", "nel": "{\"report_to\":\"cf-nel\",\"max_age\":604800}", "server": "cloudflare", "cf-ray": "9080cc25ceb7fcfb-SIN"} }
polling flush
Waiting for connection task to finish
polling flush
polling shutdown
polling flush
thread 'tokio-runtime-worker' panicked at src\bin\testh2_pending.rs:49:13:
poll_flush called while shutdown in flight
stack backtrace:
(...backtrace omitted...)
Expected result
When poll_shutdown
returns Poll::Pending
, poll_flush
should not be called afterwards. Calling _main_tcp
produces the following output:
Connected
polling flush
polling shutdown
polling shutdown
Shutdown done
Background
This issue also relates to compio-rs/cyper#25. In compio
, a stream cannot have a flush
request and a shutdown
request pending at the same time. We tried to workaround the unexpected behavior caused here (i.e. poll_flush
+ poll_shutdown
in the same task poll
), but it ended up causing more issues such as connection
s not transitioning to Closed
state. I guess it's better to fix the root issue in h2, hence bringing it here instead of introducing another workaround.
Regarding the fix, I would suggest adding a flag to guard this poll_flush
call so that it will only be called when necessary.