Skip to content

Commit

Permalink
make h3 able to connect to nginx (hyperium#242)
Browse files Browse the repository at this point in the history
* control stream is a unidirectional stream

* logging level debug for unsupported frame

* initiate qpack steams and dont wait on grease stream

* cleanup

* chore: refactor grease stream handling in ConnectionInner

* remove wrong comment

* bring back comment

* add tokio macros

* join3
  • Loading branch information
Ruben2424 authored Jun 3, 2024
1 parent d1ec7c4 commit 716097b
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 62 deletions.
214 changes: 158 additions & 56 deletions h3/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
convert::TryFrom,
marker::PhantomData,
sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard},
task::{Context, Poll},
};
Expand All @@ -8,7 +9,7 @@ use bytes::{Buf, Bytes, BytesMut};
use futures_util::{future, ready};
use http::HeaderMap;
use stream::WriteBuf;
use tracing::{trace, warn};
use tracing::warn;

use crate::{
config::{Config, Settings},
Expand All @@ -21,7 +22,7 @@ use crate::{
varint::VarInt,
},
qpack,
quic::{self, SendStream as _},
quic::{self, SendStream},
stream::{self, AcceptRecvStream, AcceptedRecvStream, BufRecvStream, UniStreamHeader},
webtransport::SessionId,
};
Expand Down Expand Up @@ -107,7 +108,9 @@ where
pub conn: C,
control_send: C::SendStream,
control_recv: Option<FrameStream<C::RecvStream, B>>,
decoder_send: Option<C::SendStream>,
decoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
encoder_send: Option<C::SendStream>,
encoder_recv: Option<AcceptedRecvStream<C::RecvStream, B>>,
/// Buffers incoming uni/recv streams which have yet to be claimed.
///
Expand All @@ -134,15 +137,37 @@ where

got_peer_settings: bool,
pub send_grease_frame: bool,
// tells if the grease steam should be sent
send_grease_stream_flag: bool,
// step of the grease sending poll fn
grease_step: GreaseStatus<C::SendStream, B>,
pub config: Config,
}

enum GreaseStatus<S, B>
where
S: SendStream<B>,
B: Buf,
{
/// Grease stream is not started
NotStarted(PhantomData<B>),
/// Grease steam is started without data
Started(Option<S>),
/// Grease stream is started with data
DataPrepared(Option<S>),
/// Data is sent on grease stream
DataSent(S),
/// Grease stream is finished
Finished,
}

impl<B, C> ConnectionInner<C, B>
where
C: quic::Connection<B>,
B: Buf,
{
pub async fn send_settings(&mut self) -> Result<(), Error> {
/// Sends the settings and initializes the control streams
pub async fn send_control_stream_headers(&mut self) -> Result<(), Error> {
#[cfg(test)]
if !self.config.send_settings {
return Ok(());
Expand Down Expand Up @@ -178,14 +203,32 @@ where
//# Endpoints MUST NOT require any data to be received from
//# the peer prior to sending the SETTINGS frame; settings MUST be sent
//# as soon as the transport is ready to send data.
trace!("Sending Settings frame: {:#x?}", settings);
stream::write(
&mut self.control_send,
WriteBuf::from(UniStreamHeader::Control(settings)),

let mut decoder_send = Option::take(&mut self.decoder_send);
let mut encoder_send = Option::take(&mut self.encoder_send);

let (control, ..) = future::join3(
stream::write(
&mut self.control_send,
WriteBuf::from(UniStreamHeader::Control(settings)),
),
async {
if let Some(stream) = &mut decoder_send {
let _ = stream::write(stream, WriteBuf::from(UniStreamHeader::Decoder)).await;
}
},
async {
if let Some(stream) = &mut encoder_send {
let _ = stream::write(stream, WriteBuf::from(UniStreamHeader::Encoder)).await;
}
},
)
.await?;
.await;

Ok(())
self.decoder_send = decoder_send;
self.encoder_send = encoder_send;

control
}

/// Initiates the connection and opens a control stream
Expand All @@ -194,10 +237,26 @@ where
//# Endpoints SHOULD create the HTTP control stream as well as the
//# unidirectional streams required by mandatory extensions (such as the
//# QPACK encoder and decoder streams) first, and then create additional
//# streams as allowed by their peer.
let control_send = future::poll_fn(|cx| conn.poll_open_send(cx))
.await
.map_err(|e| Code::H3_STREAM_CREATION_ERROR.with_transport(e))?;

// start streams
let (control_send, qpack_encoder, qpack_decoder) = (
future::poll_fn(|cx| conn.poll_open_send(cx)).await,
future::poll_fn(|cx| conn.poll_open_send(cx)).await,
future::poll_fn(|cx| conn.poll_open_send(cx)).await,
);

let control_send =
control_send.map_err(|e| Code::H3_STREAM_CREATION_ERROR.with_transport(e))?;

let qpack_encoder = match qpack_encoder {
Ok(stream) => Some(stream),
Err(_) => None,
};

let qpack_decoder = match qpack_decoder {
Ok(stream) => Some(stream),
Err(_) => None,
};

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.1
//= type=implication
Expand All @@ -216,20 +275,14 @@ where
send_grease_frame: config.send_grease,
config,
accepted_streams: Default::default(),
decoder_send: qpack_decoder,
encoder_send: qpack_encoder,
// send grease stream if configured
send_grease_stream_flag: config.send_grease,
// start at first step
grease_step: GreaseStatus::NotStarted(PhantomData),
};

conn_inner.send_settings().await?;

// start a grease stream
if config.send_grease {
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.8
//= type=implication
//# Frame types of the format 0x1f * N + 0x21 for non-negative integer
//# values of N are reserved to exercise the requirement that unknown
//# types be ignored (Section 9). These frames have no semantics, and
//# they MAY be sent on any stream where frames are allowed to be sent.
conn_inner.start_grease_stream().await;
}
conn_inner.send_control_stream_headers().await?;

Ok(conn_inner)
}
Expand Down Expand Up @@ -479,6 +532,17 @@ where
}
}
};

if self.send_grease_stream_flag {
//= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.8
//= type=implication
//# Frame types of the format 0x1f * N + 0x21 for non-negative integer
//# values of N are reserved to exercise the requirement that unknown
//# types be ignored (Section 9). These frames have no semantics, and
//# they MAY be sent on any stream where frames are allowed to be sent.
ready!(self.poll_grease_stream(cx));
}

Poll::Ready(res)
}

Expand Down Expand Up @@ -533,50 +597,88 @@ where
code.with_reason(reason.as_ref(), crate::error::ErrorLevel::ConnectionError)
}

/// starts an grease stream
/// https://www.rfc-editor.org/rfc/rfc9114.html#stream-grease
async fn start_grease_stream(&mut self) {
// start the stream
let mut grease_stream = match future::poll_fn(|cx| self.conn.poll_open_send(cx))
.await
.map_err(|e| Code::H3_STREAM_CREATION_ERROR.with_transport(e))
{
Err(err) => {
warn!("grease stream creation failed with {}", err);
return;
}
Ok(grease) => grease,
// start grease stream and send data
fn poll_grease_stream(&mut self, cx: &mut Context<'_>) -> Poll<()> {
if matches!(self.grease_step, GreaseStatus::NotStarted(_)) {
self.grease_step = match self.conn.poll_open_send(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
// could not create grease stream
// dont try again
self.send_grease_stream_flag = false;
warn!("grease stream creation failed with");
return Poll::Ready(());
}
Poll::Ready(Ok(stream)) => GreaseStatus::Started(Some(stream)),
};
};

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.3
//# Stream types of the format 0x1f * N + 0x21 for non-negative integer
//# values of N are reserved to exercise the requirement that unknown
//# types be ignored. These streams have no semantics, and they can be
//# sent when application-layer padding is desired. They MAY also be
//# sent on connections where no data is currently being transferred.
match stream::write(&mut grease_stream, (StreamType::grease(), Frame::Grease)).await {
Ok(()) => (),
Err(err) => {
warn!("write data on grease stream failed with {}", err);
return;
if let GreaseStatus::Started(stream) = &mut self.grease_step {
if let Some(stream) = stream {
if stream
.send_data((StreamType::grease(), Frame::Grease))
.is_err()
{
self.send_grease_stream_flag = false;
warn!("write data on grease stream failed with");
return Poll::Ready(());
};
}
}
self.grease_step = GreaseStatus::DataPrepared(stream.take());
};

if let GreaseStatus::DataPrepared(stream) = &mut self.grease_step {
if let Some(stream) = stream {
match stream.poll_ready(cx) {
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
// could not write grease frame
// dont try again
self.send_grease_stream_flag = false;
warn!("write data on grease stream failed with");
return Poll::Ready(());
}
};
}
self.grease_step = GreaseStatus::DataSent(match stream.take() {
Some(stream) => stream,
None => {
// this should never happen
self.send_grease_stream_flag = false;
return Poll::Ready(());
}
});
};

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.3
//# When sending a reserved stream type,
//# the implementation MAY either terminate the stream cleanly or reset
//# it.

//= https://www.rfc-editor.org/rfc/rfc9114#section-6.2.3
//# When resetting the stream, either the H3_NO_ERROR error code or
//# a reserved error code (Section 8.1) SHOULD be used.
match future::poll_fn(|cx| grease_stream.poll_finish(cx))
.await
.map_err(|e| Code::H3_NO_ERROR.with_transport(e))
{
Ok(()) => (),
Err(err) => warn!("grease stream error on close {}", err),
if let GreaseStatus::DataSent(stream) = &mut self.grease_step {
match stream.poll_finish(cx) {
Poll::Ready(Ok(_)) => (),
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(_)) => {
// could not finish grease stream
// dont try again
self.send_grease_stream_flag = false;
warn!("finish grease stream failed with");
return Poll::Ready(());
}
};
self.grease_step = GreaseStatus::Finished;
};

// grease stream is closed
// dont do another one
self.send_grease_stream_flag = false;
Poll::Ready(())
}

#[allow(missing_docs)]
Expand Down
2 changes: 1 addition & 1 deletion h3/src/proto/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ impl Settings {
//# H3_SETTINGS_ERROR.
settings.insert(identifier, value)?;
} else {
tracing::warn!("Unsupported setting: {:#x?}", identifier);
tracing::debug!("Unsupported setting: {:#x?}", identifier);
}
}
Ok(settings)
Expand Down
13 changes: 8 additions & 5 deletions h3/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ where
pub enum UniStreamHeader {
Control(Settings),
WebTransportUni(SessionId),
Encoder,
Decoder,
}

impl Encode for UniStreamHeader {
Expand All @@ -132,6 +134,12 @@ impl Encode for UniStreamHeader {
StreamType::WEBTRANSPORT_UNI.encode(buf);
session_id.encode(buf);
}
UniStreamHeader::Encoder => {
StreamType::ENCODER.encode(buf);
}
UniStreamHeader::Decoder => {
StreamType::DECODER.encode(buf);
}
}
}
}
Expand All @@ -154,17 +162,12 @@ where
}

pub enum BidiStreamHeader {
Control(Settings),
WebTransportBidi(SessionId),
}

impl Encode for BidiStreamHeader {
fn encode<B: BufMut>(&self, buf: &mut B) {
match self {
Self::Control(settings) => {
StreamType::CONTROL.encode(buf);
settings.encode(buf);
}
Self::WebTransportBidi(session_id) => {
StreamType::WEBTRANSPORT_BIDI.encode(buf);
session_id.encode(buf);
Expand Down

0 comments on commit 716097b

Please sign in to comment.