Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

poll_flush is called on inner connection after poll_shutdown being issued #835

Closed
bdbai opened this issue Jan 26, 2025 · 0 comments · Fixed by #836
Closed

poll_flush is called on inner connection after poll_shutdown being issued #835

bdbai opened this issue Jan 26, 2025 · 0 comments · Fixed by #836

Comments

@bdbai
Copy link
Contributor

bdbai commented Jan 26, 2025

Code to reproduce

use h2::client;

use http::{Method, Request};
use std::{
    mem,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::{
    io::{AsyncRead, AsyncWrite, AsyncWriteExt, ReadBuf},
    net::TcpStream,
};
use tokio_native_tls::native_tls::TlsConnector;

type Result<T> = std::io::Result<T>;

struct DelayedShutdownStream<T> {
    inflight_shutdown: bool,
    inner: T,
}

impl<T> DelayedShutdownStream<T> {
    fn new(inner: T) -> Self {
        Self {
            inflight_shutdown: false,
            inner,
        }
    }
}

impl<T: AsyncRead + Unpin> AsyncRead for DelayedShutdownStream<T> {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<Result<()>> {
        Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
    }
}

impl<T: AsyncWrite + Unpin> AsyncWrite for DelayedShutdownStream<T> {
    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
        Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        println!("polling flush");
        if self.inflight_shutdown {
            panic!("poll_flush called while shutdown in flight");
        }
        Pin::new(&mut self.get_mut().inner).poll_flush(cx)
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
        println!("polling shutdown");
        if mem::replace(&mut self.inflight_shutdown, true) {
            return Pin::new(&mut self.get_mut().inner).poll_shutdown(cx);
        }
        cx.waker().wake_by_ref();
        Poll::Pending
    }
}

#[tokio::main]
pub async fn main() -> Result<()> {
    // _main_tcp().await
    main_h2().await
}
pub async fn _main_tcp() -> Result<()> {
    let stream = TcpStream::connect("1.1.1.1:443").await?;
    println!("Connected");
    let mut stream = DelayedShutdownStream::new(stream);
    stream.flush().await?;
    stream.shutdown().await?;
    println!("Shutdown done");
    Ok(())
}

pub async fn main_h2() -> Result<()> {
    let (mut h2, conn_task) = {
        // Establish TCP connection to the server.
        let stream = TcpStream::connect("1.1.1.1:443").await?;
        let builder = tokio_native_tls::TlsConnector::from(
            TlsConnector::builder()
                .request_alpns(&["h2"])
                .build()
                .unwrap(),
        );
        let stream = builder.connect("1.1.1.1", stream).await.unwrap();
        let stream = DelayedShutdownStream::new(stream);
        let (h2, connection) = client::handshake(stream).await.expect("handshake");

        let conn_task = tokio::spawn(async move {
            connection.await.unwrap();
        });
        let h2 = h2.ready().await.expect("h2 ready");
        (h2, conn_task)
    };
    {
        // Prepare the HTTP request to send to the server.
        let request = Request::builder()
            .method(Method::GET)
            .uri("https://1.1.1.1/")
            .body(())
            .unwrap();

        // Send the request. The second tuple item allows the caller
        // to stream a request body.
        let (response, _send) = h2.send_request(request, true).unwrap();

        let (head, _) = response.await.expect("response").into_parts();

        println!("Received response: {:?}", head);
    }
    drop(h2);
    println!("Waiting for connection task to finish");
    conn_task.await?;
    println!("Connection task finished");
    Ok(())
}

Actual result

polling flush appears many times even after polling shutdown. The unexpected flush leads to a panic.

polling flush
polling flush
Received response: Parts { status: 301, version: HTTP/2.0, headers: {"date": "Sun, 26 Jan 2025 13:19:55 GMT", "content-length": "0", "location": "https://one.one.one.one/", "report-to": "{\"endpoints\":[{\"url\":\"https:\/\/a.nel.cloudflare.com\/report\/v4?s=gqPSqV%2FCguv1iLTmtVVEr5DQ7uaGmTuHvQDZX6Zjly6Dp%2BaVknfUzue3V4HMlJAI4TRFitR7FcvELAnnCFtPMr6j7Tbc7CLStsKmE5dsBIS%2F0L3RM4zJWHo%3D\"}],\"group\":\"cf-nel\",\"max_age\":604800}", "nel": "{\"report_to\":\"cf-nel\",\"max_age\":604800}", "server": "cloudflare", "cf-ray": "9080cc25ceb7fcfb-SIN"} }
polling flush
Waiting for connection task to finish
polling flush
polling shutdown
polling flush
thread 'tokio-runtime-worker' panicked at src\bin\testh2_pending.rs:49:13:
poll_flush called while shutdown in flight
stack backtrace:
(...backtrace omitted...)

Expected result

When poll_shutdown returns Poll::Pending, poll_flush should not be called afterwards. Calling _main_tcp produces the following output:

Connected
polling flush
polling shutdown
polling shutdown
Shutdown done

Background

This issue also relates to compio-rs/cyper#25. In compio, a stream cannot have a flush request and a shutdown request pending at the same time. We tried to workaround the unexpected behavior caused here (i.e. poll_flush + poll_shutdown in the same task poll), but it ended up causing more issues such as connections not transitioning to Closed state. I guess it's better to fix the root issue in h2, hence bringing it here instead of introducing another workaround.

Regarding the fix, I would suggest adding a flag to guard this poll_flush call so that it will only be called when necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant