-
-
Notifications
You must be signed in to change notification settings - Fork 317
Description
I am trying to implement a basic server push scenario where the server makes multiple push requests. The client is first expected to process pushed requests and only then await for the main response. What I noticed is that when I simulate a delay on server side between sending push requests and the main response, push_promise()
seems to be stuck on client side after receiving all of the promises. It looks like receiving the main response does not wake up the waiter.
However, when I remove the delay or make the client firstly await for the main response, and only then call push_promise()
, everything works fine.
Firstly, my question is - Is server push in h2 expected to be used this way? I was not able to find any documentation about this or at least a basic usage example. If yes, is this a bug or I am misusing the library somewhere?
Code example:
Client:
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
// Establish TCP connection to the server.
let tcp = TcpStream::connect("127.0.0.1:5928").await?;
let (h2, connection) = client::handshake(tcp).await?;
tokio::spawn(async move {
connection.await.unwrap();
});
let mut h2 = h2.ready().await?;
// Prepare the HTTP request to send to the server.
let request = Request::builder()
.method(Method::GET)
.header("test_key", "test_value")
.uri("https://www.example.com/")
.body(())
.expect("Request error");
let (mut response, _) = h2.send_request(request.clone(), true).expect("Send request error");
// Awaiting for response consumes the object, so we need to get the stream of promises first.
let mut pushes = response.push_promises();
while let Some(push) = pushes.push_promise().await {
println!("Push {:?}", push);
let push_promise = push.unwrap();
let (request, pushed_response_future) = push_promise.into_parts();
println!("Pushed request: {:?}", request);
let (head_pushed, ref mut body_pushed) = pushed_response_future.await.unwrap().into_parts();
println!("Pushed data_head: {:?}", head_pushed);
while let Some(chunk) = body_pushed.data().await {
println!("Pushed data {:?}", chunk);
}
}
// We never get here, above while loop is stuck after the last promise was processed.
println!("Finished receiving pushed promises!");
let (head, mut body) = response.await?.into_parts();
println!("Received response: {:?}", head);
while let Some(chunk) = body.data().await {
println!("Main response data {:?}", chunk);
}
Ok(())
}
Server:
#[tokio::main]
pub async fn main() -> Result<(), Box<dyn Error>> {
let mut listener = TcpListener::bind("127.0.0.1:5928").await?;
println!("Listening on {:?}", listener.local_addr());
loop {
if let Ok((socket, _peer_addr)) = listener.accept().await {
socket.set_nodelay(true)?; // Disable Nagle's algorithm
tokio::spawn(async move {
if let Err(e) = handle(socket).await {
println!(" -> err={:?}", e);
}
});
}
}
}
async fn handle(socket: TcpStream) -> Result<(), Box<dyn Error>> {
let mut connection = server::handshake(socket).await?;
println!("H2 connection bound");
while let Some(result) = connection.accept().await {
tokio::spawn(async move {
let (request, mut respond) = result.unwrap();
println!("GOT request: {:?}", request);
for i in 0..50 {
send_push(i, &mut respond);
}
// Simulate a delay.
tokio::time::sleep(Duration::from_secs(5)).await;
println!("Sending response");
let response = Response::builder().status(StatusCode::OK).body(()).unwrap();
let mut send = respond.send_response(response, false).unwrap();
send.send_data("hello world".into(), true).unwrap();
println!("Sending response complete");
});
}
println!("~~~~~~~~~~~~~~~~~~~~~~~~~~~ H2 connection CLOSE !!!!!! ~~~~~~~~~~~");
Ok(())
}
fn send_push<B>(id: usize, send_response: &mut SendResponse<B>)
where
B: bytes::Buf + From<Bytes>
{
let uri = format!("http://www.example.com/{:?}", id);
let pushed_req = Request::builder()
.uri(uri)
.body(())
.unwrap();
let pushed_rsp = http::Response::builder().status(200).body(()).unwrap();
let mut send_pushed = send_response
.push_request(pushed_req)
.unwrap()
.send_response(pushed_rsp, false)
.unwrap();
send_pushed.send_data(Bytes::from("a").into(), false).unwrap();
send_pushed.send_data(Bytes::from("b").into(), false).unwrap();
send_pushed.send_data(Bytes::from("c").into(), true).unwrap();
}