Skip to content

poll_flush is called on inner connection after poll_shutdown being issued #835

Closed
@bdbai

Description

@bdbai

Code to reproduce

use h2::client;

use http::{Method, Request};
use std::{
    mem,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::{
    io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
    net::TcpStream,
};
use tokio_native_tls::native_tls::TlsConnector;

type Result<T> = std::io::Result<T>;

struct DelayedShutdownStream<T> {
    inflight_shutdown: bool,
    inner: T,
}

impl<T> DelayedShutdownStream<T> {
    fn new(inner: T) -> Self {
        Self {
            inflight_shutdown: false,
            inner,
        }
    }
}

impl<T: AsyncRead + Unpin> AsyncRead for DelayedShutdownStream<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
    }
}

impl<T: AsyncWrite + Unpin> AsyncWrite for DelayedShutdownStream<T> {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
        Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        println!("polling flush");
        if self.inflight_shutdown {
            panic!("poll_flush called while shutdown in flight");
        }
        Pin::new(&mut self.get_mut().inner).poll_flush(cx)
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        println!("polling shutdown");
        if mem::replace(&mut self.inflight_shutdown, true) {
            return Pin::new(&mut self.get_mut().inner).poll_shutdown(cx);
        }
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

#[tokio::main]
pub async fn main() -> Result<()> {
    // _main_tcp().await
    main_h2().await
}
pub async fn _main_tcp() -> Result<()> {
    let stream = TcpStream::connect("1.1.1.1:443").await?;
    println!("Connected");
    let mut stream = DelayedShutdownStream::new(stream);
    stream.flush().await?;
    stream.shutdown().await?;
    println!("Shutdown done");
    Ok(())
}

pub async fn main_h2() -> Result<()> {
    let (mut h2, conn_task) = {
        // Establish TCP connection to the server.
        let stream = TcpStream::connect("1.1.1.1:443").await?;
        let builder = tokio_native_tls::TlsConnector::from(
            TlsConnector::builder()
                .request_alpns(&["h2"])
                .build()
                .unwrap(),
        );
        let stream = builder.connect("1.1.1.1", stream).await.unwrap();
        let stream = DelayedShutdownStream::new(stream);
        let (h2, connection) = client::handshake(stream).await.expect("handshake");

        let conn_task = tokio::spawn(async move {
            connection.await.unwrap();
        });
        let h2 = h2.ready().await.expect("h2 ready");
        (h2, conn_task)
    };
    {
        // Prepare the HTTP request to send to the server.
        let request = Request::builder()
            .method(Method::GET)
            .uri("https://1.1.1.1/")
            .body(())
            .unwrap();

        // Send the request. The second tuple item allows the caller
        // to stream a request body.
        let (response, _send) = h2.send_request(request, true).unwrap();

        let (head, _) = response.await.expect("response").into_parts();

        println!("Received response: {:?}", head);
    }
    drop(h2);
    println!("Waiting for connection task to finish");
    conn_task.await?;
    println!("Connection task finished");
    Ok(())
}

Actual result

polling flush appears many times even after polling shutdown. The unexpected flush leads to a panic.

polling flush
polling flush
Received response: Parts { status: 301, version: HTTP/2.0, headers: {"date": "Sun, 26 Jan 2025 13:19:55 GMT", "content-length": "0", "location": "https://one.one.one.one/", "report-to": "{\"endpoints\":[{\"url\":\"https:\/\/a.nel.cloudflare.com\/report\/v4?s=gqPSqV%2FCguv1iLTmtVVEr5DQ7uaGmTuHvQDZX6Zjly6Dp%2BaVknfUzue3V4HMlJAI4TRFitR7FcvELAnnCFtPMr6j7Tbc7CLStsKmE5dsBIS%2F0L3RM4zJWHo%3D\"}],\"group\":\"cf-nel\",\"max_age\":604800}", "nel": "{\"report_to\":\"cf-nel\",\"max_age\":604800}", "server": "cloudflare", "cf-ray": "9080cc25ceb7fcfb-SIN"} }
polling flush
Waiting for connection task to finish
polling flush
polling shutdown
polling flush
thread 'tokio-runtime-worker' panicked at src\bin\testh2_pending.rs:49:13:
poll_flush called while shutdown in flight
stack backtrace:
(...backtrace omitted...)

Expected result

When poll_shutdown returns Poll::Pending, poll_flush should not be called afterwards. Calling _main_tcp produces the following output:

Connected
polling flush
polling shutdown
polling shutdown
Shutdown done

Background

This issue also relates to compio-rs/cyper#25. In compio, a stream cannot have a flush request and a shutdown request pending at the same time. We tried to workaround the unexpected behavior caused here (i.e. poll_flush + poll_shutdown in the same task poll), but it ended up causing more issues such as connections not transitioning to Closed state. I guess it's better to fix the root issue in h2, hence bringing it here instead of introducing another workaround.

Regarding the fix, I would suggest adding a flag to guard this poll_flush call so that it will only be called when necessary.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions