diff --git a/Cargo.toml b/Cargo.toml index 355c5a1e..647fdddf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "h3", "h3-quinn", "h3-webtransport", + "h3-datagram", # Internal "examples", diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 420fbcc6..ed99d7d2 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -11,7 +11,7 @@ anyhow = "1.0" bytes = "1" futures = "0.3" h3 = { path = "../h3", features = ["tracing"] } -h3-quinn = { path = "../h3-quinn", features = ["tracing"] } +h3-quinn = { path = "../h3-quinn", features = ["tracing", "datagram"] } h3-webtransport = { path = "../h3-webtransport" } http = "1" quinn = { version = "0.11", default-features = false, features = [ @@ -39,6 +39,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ octets = "0.3.0" tracing-tree = { version = "0.3" } +h3-datagram = { path = "../h3-datagram" } [features] tree = [] diff --git a/examples/webtransport_server.rs b/examples/webtransport_server.rs index 52fbfe7f..19d532a2 100644 --- a/examples/webtransport_server.rs +++ b/examples/webtransport_server.rs @@ -3,9 +3,10 @@ use bytes::{BufMut, Bytes, BytesMut}; use h3::{ error::ErrorLevel, ext::Protocol, - quic::{self, RecvDatagramExt, SendDatagramExt, SendStreamUnframed}, + quic::{self, SendStreamUnframed}, server::Connection, }; +use h3_datagram::quic_traits::{RecvDatagramExt, SendDatagramExt}; use h3_quinn::quinn::{self, crypto::rustls::QuicServerConfig}; use h3_webtransport::{ server::{self, WebTransportSession}, @@ -291,6 +292,8 @@ where stream::SendStream: AsyncWrite, C::BidiStream: SendStreamUnframed, C::SendStream: SendStreamUnframed, + ::Error: h3::quic::Error, + >::Error: h3::quic::Error, { let session_id = session.session_id(); diff --git a/h3-datagram/Cargo.toml b/h3-datagram/Cargo.toml new file mode 100644 index 00000000..dd09c9d0 --- /dev/null +++ b/h3-datagram/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "h3-datagram" +version = "0.0.1" +edition = "2021" + +[dependencies] +#h3 = { path = "../h3" } +bytes = "1.4" +pin-project-lite = { version = "0.2", default-features = false } + +[dependencies.h3] +version = "0.0.6" +path = "../h3" +features = ["i-implement-a-third-party-backend-and-opt-into-breaking-changes"] diff --git a/h3-datagram/readme.md b/h3-datagram/readme.md new file mode 100644 index 00000000..ae01ab82 --- /dev/null +++ b/h3-datagram/readme.md @@ -0,0 +1,8 @@ +# H3 Datagram + +this crate provides an implementation of the [h3-datagram](https://datatracker.ietf.org/doc/html/rfc9297) spec that works with the h3 crate. + +## Usage +As stated in the [rfc](https://datatracker.ietf.org/doc/html/rfc9297#abstract) this is intended to be used for protocol extensions like [Web-Transport](https://datatracker.ietf.org/doc/draft-ietf-webtrans-http3/) and not directly by applications. + +> HTTP Datagrams and the Capsule Protocol are intended for use by HTTP extensions, not applications. \ No newline at end of file diff --git a/h3-datagram/src/datagram.rs b/h3-datagram/src/datagram.rs new file mode 100644 index 00000000..f4b2823f --- /dev/null +++ b/h3-datagram/src/datagram.rs @@ -0,0 +1,69 @@ +use bytes::{Buf, Bytes}; +use h3::{error::Code, proto::varint::VarInt, quic::StreamId, Error}; + +/// HTTP datagram frames +/// See: +pub struct Datagram { + /// Stream id divided by 4 + stream_id: StreamId, + /// The data contained in the datagram + payload: B, +} + +impl Datagram +where + B: Buf, +{ + /// Creates a new datagram frame + pub fn new(stream_id: StreamId, payload: B) -> Self { + assert!( + stream_id.into_inner() % 4 == 0, + "StreamId is not divisible by 4" + ); + Self { stream_id, payload } + } + + /// Decodes a datagram frame from the QUIC datagram + pub fn decode(mut buf: B) -> Result { + let q_stream_id = VarInt::decode(&mut buf) + .map_err(|_| Code::H3_DATAGRAM_ERROR.with_cause("Malformed datagram frame"))?; + + //= https://www.rfc-editor.org/rfc/rfc9297#section-2.1 + // Quarter Stream ID: A variable-length integer that contains the value of the client-initiated bidirectional + // stream that this datagram is associated with divided by four (the division by four stems + // from the fact that HTTP requests are sent on client-initiated bidirectional streams, + // which have stream IDs that are divisible by four). The largest legal QUIC stream ID + // value is 262-1, so the largest legal value of the Quarter Stream ID field is 260-1. + // Receipt of an HTTP/3 Datagram that includes a larger value MUST be treated as an HTTP/3 + // connection error of type H3_DATAGRAM_ERROR (0x33). + let stream_id = StreamId::try_from(u64::from(q_stream_id) * 4) + .map_err(|_| Code::H3_DATAGRAM_ERROR.with_cause("Invalid stream id"))?; + + let payload = buf; + + Ok(Self { stream_id, payload }) + } + + #[inline] + /// Returns the associated stream id of the datagram + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + + #[inline] + /// Returns the datagram payload + pub fn payload(&self) -> &B { + &self.payload + } + + /// Encode the datagram to wire format + pub fn encode(self, buf: &mut D) { + (VarInt::from(self.stream_id) / 4).encode(buf); + buf.put(self.payload); + } + + /// Returns the datagram payload + pub fn into_payload(self) -> B { + self.payload + } +} diff --git a/h3-datagram/src/datagram_traits.rs b/h3-datagram/src/datagram_traits.rs new file mode 100644 index 00000000..ee0c33ac --- /dev/null +++ b/h3-datagram/src/datagram_traits.rs @@ -0,0 +1,21 @@ +//! Traits which define the user API for datagrams. +//! These traits are implemented for the client and server types in the `h3` crate. + +use bytes::Buf; +use h3::{ + quic::{self, StreamId}, + Error, +}; + +use crate::server::ReadDatagram; + +pub trait HandleDatagramsExt +where + B: Buf, + C: quic::Connection, +{ + /// Sends a datagram + fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error>; + /// Reads an incoming datagram + fn read_datagram(&mut self) -> ReadDatagram; +} diff --git a/h3-datagram/src/lib.rs b/h3-datagram/src/lib.rs new file mode 100644 index 00000000..9c0e1fca --- /dev/null +++ b/h3-datagram/src/lib.rs @@ -0,0 +1,4 @@ +pub mod datagram; +pub mod datagram_traits; +pub mod quic_traits; +pub mod server; diff --git a/h3-datagram/src/quic_traits.rs b/h3-datagram/src/quic_traits.rs new file mode 100644 index 00000000..a1f6ca9e --- /dev/null +++ b/h3-datagram/src/quic_traits.rs @@ -0,0 +1,38 @@ +//! QUIC Transport traits +//! +//! This module includes traits and types meant to allow being generic over any +//! QUIC implementation. + +use core::task; +use std::{error::Error, task::Poll}; + +use bytes::Buf; + +use crate::datagram::Datagram; + +/// Extends the `Connection` trait for sending datagrams +/// +/// See: +pub trait SendDatagramExt { + /// The error type that can occur when sending a datagram + type Error: Into>; + + /// Send a datagram + fn send_datagram(&mut self, data: Datagram) -> Result<(), Self::Error>; +} + +/// Extends the `Connection` trait for receiving datagrams +/// +/// See: +pub trait RecvDatagramExt { + /// The type of `Buf` for *raw* datagrams (without the stream_id decoded) + type Buf: Buf; + /// The error type that can occur when receiving a datagram + type Error: Into>; + + /// Poll the connection for incoming datagrams. + fn poll_accept_datagram( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>>; +} diff --git a/h3-datagram/src/server.rs b/h3-datagram/src/server.rs new file mode 100644 index 00000000..c35acff2 --- /dev/null +++ b/h3-datagram/src/server.rs @@ -0,0 +1,73 @@ +//! server API + +use std::{ + future::Future, + marker::PhantomData, + task::{ready, Context, Poll}, +}; + +use bytes::Buf; +use h3::{ + quic::{self, StreamId}, + server::Connection, + Error, +}; +use pin_project_lite::pin_project; + +use crate::{ + datagram::Datagram, + datagram_traits::HandleDatagramsExt, + quic_traits::{self, RecvDatagramExt, SendDatagramExt}, +}; + +impl HandleDatagramsExt for Connection +where + B: Buf, + C: quic::Connection + SendDatagramExt + RecvDatagramExt, + ::Error: h3::quic::Error + 'static, + >::Error: h3::quic::Error + 'static, +{ + /// Sends a datagram + fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> { + self.inner + .conn + .send_datagram(Datagram::new(stream_id, data))?; + Ok(()) + } + + /// Reads an incoming datagram + fn read_datagram(&mut self) -> ReadDatagram { + ReadDatagram { + conn: self, + _marker: PhantomData, + } + } +} + +pin_project! { + /// Future for [`Connection::read_datagram`] + pub struct ReadDatagram<'a, C, B> + where + C: quic::Connection, + B: Buf, + { + conn: &'a mut Connection, + _marker: PhantomData, + } +} + +impl<'a, C, B> Future for ReadDatagram<'a, C, B> +where + C: quic::Connection + RecvDatagramExt, + ::Error: h3::quic::Error + 'static, + B: Buf, +{ + type Output = Result>, Error>; + + fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? { + Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))), + None => Poll::Ready(Ok(None)), + } + } +} diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml index bea10395..39fb08fe 100644 --- a/h3-quinn/Cargo.toml +++ b/h3-quinn/Cargo.toml @@ -21,7 +21,9 @@ quinn = { version = "0.11", default-features = false, features = [ tokio-util = { version = "0.7.9" } futures = { version = "0.3.28" } tokio = { version = "1", features = ["io-util"], default-features = false } +h3-datagram = { path = "../h3-datagram", optional = true } tracing = { version = "0.1.40", optional = true } [features] tracing = ["dep:tracing"] +datagram = ["dep:h3-datagram"] \ No newline at end of file diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index dd374a3e..4fd8e634 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -19,13 +19,14 @@ use futures::{ stream::{self}, Stream, StreamExt, }; + +#[cfg(feature = "datagram")] +use h3_datagram::{datagram::Datagram, quic_traits}; + pub use quinn::{self, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError}; use quinn::{ApplicationClose, ClosedStream, ReadDatagram}; -use h3::{ - ext::Datagram, - quic::{self, Error, StreamId, WriteBuf}, -}; +use h3::quic::{self, Error, StreamId, WriteBuf}; use tokio_util::sync::ReusableBoxFuture; #[cfg(feature = "tracing")] @@ -248,7 +249,8 @@ where } } -impl quic::SendDatagramExt for Connection +#[cfg(feature = "datagram")] +impl quic_traits::SendDatagramExt for Connection where B: Buf, { @@ -265,7 +267,8 @@ where } } -impl quic::RecvDatagramExt for Connection { +#[cfg(feature = "datagram")] +impl quic_traits::RecvDatagramExt for Connection { type Buf = Bytes; type Error = ConnectionError; diff --git a/h3-webtransport/Cargo.toml b/h3-webtransport/Cargo.toml index 347ad945..89aae1db 100644 --- a/h3-webtransport/Cargo.toml +++ b/h3-webtransport/Cargo.toml @@ -12,6 +12,7 @@ http = "1" pin-project-lite = { version = "0.2", default-features = false } tracing = "0.1.37" tokio = { version = "1.28", default-features = false } +h3-datagram = { path = "../h3-datagram" } [dependencies.h3] version = "0.0.6" diff --git a/h3-webtransport/src/server.rs b/h3-webtransport/src/server.rs index 5ae8886b..490c9ea0 100644 --- a/h3-webtransport/src/server.rs +++ b/h3-webtransport/src/server.rs @@ -12,10 +12,10 @@ use futures_util::{future::poll_fn, ready, Future}; use h3::{ connection::ConnectionState, error::{Code, ErrorLevel}, - ext::{Datagram, Protocol}, + ext::Protocol, frame::FrameStream, proto::frame::Frame, - quic::{self, OpenStreams, RecvDatagramExt, SendDatagramExt, WriteBuf}, + quic::{self, OpenStreams, WriteBuf}, server::Connection, server::RequestStream, Error, @@ -24,6 +24,11 @@ use h3::{ quic::SendStreamUnframed, stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader}, }; +use h3_datagram::{ + datagram::Datagram, + datagram_traits::HandleDatagramsExt, + quic_traits::{RecvDatagramExt, SendDatagramExt}, +}; use http::{Method, Request, Response, StatusCode}; use h3::webtransport::SessionId; @@ -39,6 +44,7 @@ use crate::stream::{BidiStream, RecvStream, SendStream}; pub struct WebTransportSession where C: quic::Connection, + Connection: HandleDatagramsExt, B: Buf, { // See: https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-2-3 @@ -51,6 +57,7 @@ where impl WebTransportSession where + Connection: HandleDatagramsExt, C: quic::Connection, B: Buf, { @@ -374,6 +381,7 @@ impl<'a, C, B> Future for ReadDatagram<'a, C, B> where C: quic::Connection + RecvDatagramExt, B: Buf, + ::Error: h3::quic::Error + 'static, { type Output = Result, Error>; diff --git a/h3/Cargo.toml b/h3/Cargo.toml index 6531585b..9417f2a9 100644 --- a/h3/Cargo.toml +++ b/h3/Cargo.toml @@ -54,3 +54,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ ] } futures = { version = "0.3.28" } tokio-util = { version = "0.7.9" } +h3-datagram = {path = "../h3-datagram" } \ No newline at end of file diff --git a/h3/src/error.rs b/h3/src/error.rs index 94f193f1..9cb19ca9 100644 --- a/h3/src/error.rs +++ b/h3/src/error.rs @@ -200,7 +200,8 @@ impl Code { }) } - pub(crate) fn with_cause>(self, cause: E) -> Error { + #[doc(hidden)] + pub fn with_cause>(self, cause: E) -> Error { Error::from(self).with_cause(cause) } } diff --git a/h3/src/ext.rs b/h3/src/ext.rs index ff1f0e58..281d5736 100644 --- a/h3/src/ext.rs +++ b/h3/src/ext.rs @@ -1,16 +1,7 @@ //! Extensions for the HTTP/3 protocol. -use std::convert::TryFrom; use std::str::FromStr; -use bytes::{Buf, Bytes}; - -use crate::{ - error::Code, - proto::{stream::StreamId, varint::VarInt}, - Error, -}; - /// Describes the `:protocol` pseudo-header for extended connect /// /// See: @@ -53,70 +44,3 @@ impl FromStr for Protocol { } } } - -/// HTTP datagram frames -/// See: -pub struct Datagram { - /// Stream id divided by 4 - stream_id: StreamId, - /// The data contained in the datagram - payload: B, -} - -impl Datagram -where - B: Buf, -{ - /// Creates a new datagram frame - pub fn new(stream_id: StreamId, payload: B) -> Self { - assert!( - stream_id.into_inner() % 4 == 0, - "StreamId is not divisible by 4" - ); - Self { stream_id, payload } - } - - /// Decodes a datagram frame from the QUIC datagram - pub fn decode(mut buf: B) -> Result { - let q_stream_id = VarInt::decode(&mut buf) - .map_err(|_| Code::H3_DATAGRAM_ERROR.with_cause("Malformed datagram frame"))?; - - //= https://www.rfc-editor.org/rfc/rfc9297#section-2.1 - // Quarter Stream ID: A variable-length integer that contains the value of the client-initiated bidirectional - // stream that this datagram is associated with divided by four (the division by four stems - // from the fact that HTTP requests are sent on client-initiated bidirectional streams, - // which have stream IDs that are divisible by four). The largest legal QUIC stream ID - // value is 262-1, so the largest legal value of the Quarter Stream ID field is 260-1. - // Receipt of an HTTP/3 Datagram that includes a larger value MUST be treated as an HTTP/3 - // connection error of type H3_DATAGRAM_ERROR (0x33). - let stream_id = StreamId::try_from(u64::from(q_stream_id) * 4) - .map_err(|_| Code::H3_DATAGRAM_ERROR.with_cause("Invalid stream id"))?; - - let payload = buf; - - Ok(Self { stream_id, payload }) - } - - #[inline] - /// Returns the associated stream id of the datagram - pub fn stream_id(&self) -> StreamId { - self.stream_id - } - - #[inline] - /// Returns the datagram payload - pub fn payload(&self) -> &B { - &self.payload - } - - /// Encode the datagram to wire format - pub fn encode(self, buf: &mut D) { - (VarInt::from(self.stream_id) / 4).encode(buf); - buf.put(self.payload); - } - - /// Returns the datagram payload - pub fn into_payload(self) -> B { - self.payload - } -} diff --git a/h3/src/proto/stream.rs b/h3/src/proto/stream.rs index 2d525167..bbea9378 100644 --- a/h3/src/proto/stream.rs +++ b/h3/src/proto/stream.rs @@ -134,7 +134,8 @@ impl StreamId { } } - pub(crate) fn into_inner(self) -> u64 { + #[allow(missing_docs)] + pub fn into_inner(self) -> u64 { self.0 } } diff --git a/h3/src/quic.rs b/h3/src/quic.rs index 7d8086fe..39e7217a 100644 --- a/h3/src/quic.rs +++ b/h3/src/quic.rs @@ -7,7 +7,6 @@ use std::task::{self, Poll}; use bytes::Buf; -use crate::ext::Datagram; pub use crate::proto::stream::{InvalidStreamId, StreamId}; pub use crate::stream::WriteBuf; @@ -59,33 +58,6 @@ pub trait Connection: OpenStreams { fn opener(&self) -> Self::OpenStreams; } -/// Extends the `Connection` trait for sending datagrams -/// -/// See: -pub trait SendDatagramExt { - /// The error type that can occur when sending a datagram - type Error: Into>; - - /// Send a datagram - fn send_datagram(&mut self, data: Datagram) -> Result<(), Self::Error>; -} - -/// Extends the `Connection` trait for receiving datagrams -/// -/// See: -pub trait RecvDatagramExt { - /// The type of `Buf` for *raw* datagrams (without the stream_id decoded) - type Buf: Buf; - /// The error type that can occur when receiving a datagram - type Error: Into>; - - /// Poll the connection for incoming datagrams. - fn poll_accept_datagram( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>>; -} - /// Trait for opening outgoing streams pub trait OpenStreams { /// The type produced by `poll_open_bidi()` diff --git a/h3/src/server/connection.rs b/h3/src/server/connection.rs index 9d9721a8..c924f83f 100644 --- a/h3/src/server/connection.rs +++ b/h3/src/server/connection.rs @@ -5,7 +5,6 @@ use std::{ collections::HashSet, future::poll_fn, - marker::PhantomData, option::Option, result::Result, sync::Arc, @@ -21,14 +20,13 @@ use tokio::sync::mpsc; use crate::{ connection::{self, ConnectionInner, ConnectionState, SharedStateRef}, error::{Code, Error, ErrorLevel}, - ext::Datagram, frame::{FrameStream, FrameStreamError}, proto::{ frame::{Frame, PayloadLen}, push::PushId, }, qpack, - quic::{self, RecvDatagramExt, SendDatagramExt, SendStream as _}, + quic::{self, SendStream as _}, stream::BufRecvStream, }; @@ -37,7 +35,7 @@ use crate::server::request::ResolveRequest; #[cfg(feature = "tracing")] use tracing::{instrument, trace, warn}; -use super::stream::{ReadDatagram, RequestStream}; +use super::stream::RequestStream; /// Server connection driver /// @@ -419,40 +417,6 @@ where } } -impl Connection -where - C: quic::Connection + SendDatagramExt, - B: Buf, -{ - /// Sends a datagram - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - pub fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> { - self.inner - .conn - .send_datagram(Datagram::new(stream_id, data))?; - - #[cfg(feature = "tracing")] - tracing::info!("Sent datagram"); - - Ok(()) - } -} - -impl Connection -where - C: quic::Connection + RecvDatagramExt, - B: Buf, -{ - /// Reads an incoming datagram - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - pub fn read_datagram(&mut self) -> ReadDatagram { - ReadDatagram { - conn: self, - _marker: PhantomData, - } - } -} - impl Drop for Connection where C: quic::Connection, diff --git a/h3/src/server/mod.rs b/h3/src/server/mod.rs index a4663d5b..925f3666 100644 --- a/h3/src/server/mod.rs +++ b/h3/src/server/mod.rs @@ -58,5 +58,4 @@ mod stream; pub use builder::builder; pub use builder::Builder; pub use connection::Connection; -pub use stream::ReadDatagram; pub use stream::RequestStream; diff --git a/h3/src/server/stream.rs b/h3/src/server/stream.rs index 1d8a42f4..6d36c74b 100644 --- a/h3/src/server/stream.rs +++ b/h3/src/server/stream.rs @@ -4,14 +4,12 @@ use bytes::Buf; use crate::{ connection::{ConnectionState, SharedStateRef}, - ext::Datagram, - quic::{self, RecvDatagramExt}, + quic::{self}, Error, }; -use pin_project_lite::pin_project; -use super::connection::{Connection, RequestEnd}; -use std::{marker::PhantomData, sync::Arc}; +use super::connection::RequestEnd; +use std::sync::Arc; use std::{ option::Option, @@ -20,7 +18,7 @@ use std::{ }; use bytes::BytesMut; -use futures_util::{future, future::Future, ready}; +use futures_util::future; use http::{response, HeaderMap, Response}; use quic::StreamId; @@ -225,33 +223,3 @@ impl Drop for RequestEnd { } } } - -pin_project! { - /// Future for [`Connection::read_datagram`] - pub struct ReadDatagram<'a, C, B> - where - C: quic::Connection, - B: Buf, - { - pub(super) conn: &'a mut Connection, - pub(super) _marker: PhantomData, - } -} - -impl<'a, C, B> Future for ReadDatagram<'a, C, B> -where - C: quic::Connection + RecvDatagramExt, - B: Buf, -{ - type Output = Result>, Error>; - - fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - #[cfg(feature = "tracing")] - tracing::trace!("poll: read_datagram"); - - match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? { - Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))), - None => Poll::Ready(Ok(None)), - } - } -}