Skip to content

Commit

Permalink
refactor: make RpcCall an IntoFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
DaniPopes committed Dec 29, 2024
1 parent 0371332 commit 16ffd80
Showing 1 changed file with 42 additions and 176 deletions.
218 changes: 42 additions & 176 deletions crates/rpc-client/src/call.rs
Original file line number Diff line number Diff line change
@@ -1,116 +1,15 @@
use alloy_json_rpc::{
transform_response, try_deserialize_ok, Request, RequestPacket, ResponsePacket, RpcParam,
RpcResult, RpcReturn,
transform_response, try_deserialize_ok, Request, ResponsePacket, RpcParam, RpcReturn,
};
use alloy_transport::{BoxTransport, IntoBoxTransport, RpcFut, TransportError, TransportResult};
use core::panic;
use futures::FutureExt;
use serde_json::value::RawValue;
use std::{
fmt,
future::Future,
future::{Future, IntoFuture},
marker::PhantomData,
pin::Pin,
task::{self, ready, Poll::Ready},
};
use tower::Service;

/// The states of the [`RpcCall`] future.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project(project = CallStateProj)]
enum CallState<Params>
where
Params: RpcParam,
{
Prepared {
request: Option<Request<Params>>,
connection: BoxTransport,
},
AwaitingResponse {
#[pin]
fut: <BoxTransport as Service<RequestPacket>>::Future,
},
Complete,
}

impl<Params> Clone for CallState<Params>
where
Params: RpcParam,
{
fn clone(&self) -> Self {
match self {
Self::Prepared { request, connection } => {
Self::Prepared { request: request.clone(), connection: connection.clone() }
}
_ => panic!("cloned after dispatch"),
}
}
}

impl<Params> fmt::Debug for CallState<Params>
where
Params: RpcParam,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
Self::Prepared { .. } => "Prepared",
Self::AwaitingResponse { .. } => "AwaitingResponse",
Self::Complete => "Complete",
})
}
}

impl<Params> Future for CallState<Params>
where
Params: RpcParam,
{
type Output = TransportResult<Box<RawValue>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
loop {
match self.as_mut().project() {
CallStateProj::Prepared { connection, request } => {
if let Err(e) =
task::ready!(Service::<RequestPacket>::poll_ready(connection, cx))
{
self.set(Self::Complete);
return Ready(RpcResult::Err(e));
}

let request = request.take().expect("no request");
debug!(method=%request.meta.method, id=%request.meta.id, "sending request");
trace!(params_ty=%std::any::type_name::<Params>(), ?request, "full request");
let request = request.serialize();
let fut = match request {
Ok(request) => {
trace!(request=%request.serialized(), "serialized request");
connection.call(request.into())
}
Err(err) => {
trace!(?err, "failed to serialize request");
self.set(Self::Complete);
return Ready(RpcResult::Err(TransportError::ser_err(err)));
}
};
self.set(Self::AwaitingResponse { fut });
}
CallStateProj::AwaitingResponse { fut } => {
let res = match task::ready!(fut.poll(cx)) {
Ok(ResponsePacket::Single(res)) => Ready(transform_response(res)),
Err(e) => Ready(RpcResult::Err(e)),
_ => panic!("received batch response from single request"),
};
self.set(Self::Complete);
return res;
}
CallStateProj::Complete => {
panic!("Polled after completion");
}
}
}
}
}

/// A prepared, but unsent, RPC call.
///
/// This is a future that will send the request when polled. It contains a
Expand All @@ -130,26 +29,21 @@ where
/// batch request must immediately erase the `Param` type to allow batching of
/// requests with different `Param` types, while the `RpcCall` may do so lazily.
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[pin_project::pin_project]
#[derive(Clone)]
pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output>
where
Params: RpcParam,
Map: FnOnce(Resp) -> Output,
{
#[pin]
state: CallState<Params>,
map: Option<Map>,
pub struct RpcCall<Params, Resp, Output = Resp, Map = fn(Resp) -> Output> {
request: Request<Params>,
connection: BoxTransport,
map: Map,
_pd: core::marker::PhantomData<fn() -> (Resp, Output)>,
}

impl<Params, Resp, Output, Map> core::fmt::Debug for RpcCall<Params, Resp, Output, Map>
impl<Params, Resp, Output, Map> fmt::Debug for RpcCall<Params, Resp, Output, Map>
where
Params: RpcParam,
Map: FnOnce(Resp) -> Output,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("RpcCall").field("state", &self.state).finish()
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcCall").finish_non_exhaustive()
}
}

Expand All @@ -158,13 +52,11 @@ where
Params: RpcParam,
{
#[doc(hidden)]
pub fn new(req: Request<Params>, connection: impl IntoBoxTransport) -> Self {
pub fn new(request: Request<Params>, connection: impl IntoBoxTransport) -> Self {
Self {
state: CallState::Prepared {
request: Some(req),
connection: connection.into_box_transport(),
},
map: Some(std::convert::identity),
request,
connection: connection.into_box_transport(),
map: std::convert::identity,
_pd: PhantomData,
}
}
Expand Down Expand Up @@ -193,24 +85,16 @@ where
where
NewMap: FnOnce(Resp) -> NewOutput,
{
RpcCall { state: self.state, map: Some(map), _pd: PhantomData }
RpcCall { request: self.request, connection: self.connection, map, _pd: PhantomData }
}

/// Returns `true` if the request is a subscription.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn is_subscription(&self) -> bool {
self.request().meta.is_subscription()
}

/// Set the request to be a non-standard subscription (i.e. not
/// "eth_subscribe").
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn set_is_subscription(&mut self) {
self.request_mut().meta.set_is_subscription();
}
Expand All @@ -224,49 +108,28 @@ where
///
/// This is useful for modifying the params after the request has been
/// prepared.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn params(&mut self) -> &mut Params {
&mut self.request_mut().params
}

/// Returns a reference to the request.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn request(&self) -> &Request<Params> {
let CallState::Prepared { request, .. } = &self.state else {
panic!("Cannot get request after request has been sent");
};
request.as_ref().expect("no request in prepared")
&self.request
}

/// Returns a mutable reference to the request.
///
/// # Panics
///
/// Panics if called after the request has been sent.
pub fn request_mut(&mut self) -> &mut Request<Params> {
let CallState::Prepared { request, .. } = &mut self.state else {
panic!("Cannot get request after request has been sent");
};
request.as_mut().expect("no request in prepared")
&mut self.request
}

/// Map the params of the request into a new type.
pub fn map_params<NewParams: RpcParam>(
self,
map: impl Fn(Params) -> NewParams,
map: impl FnOnce(Params) -> NewParams,
) -> RpcCall<NewParams, Resp, Output, Map> {
let CallState::Prepared { request, connection } = self.state else {
panic!("Cannot get request after request has been sent");
};
let request = request.expect("no request in prepared").map_params(map);
RpcCall {
state: CallState::Prepared { request: Some(request), connection },
request: self.request.map_params(map),
connection: self.connection,
map: self.map,
_pd: PhantomData,
}
Expand All @@ -285,13 +148,9 @@ where
///
/// Panics if called after the request has been polled.
pub fn into_owned_params(self) -> RpcCall<Params::Owned, Resp, Output, Map> {
let CallState::Prepared { request, connection } = self.state else {
panic!("Cannot get params after request has been sent");
};
let request = request.expect("no request in prepared").into_owned_params();

RpcCall {
state: CallState::Prepared { request: Some(request), connection },
request: self.request.into_owned_params(),
connection: self.connection,
map: self.map,
_pd: PhantomData,
}
Expand All @@ -302,30 +161,37 @@ impl<'a, Params, Resp, Output, Map> RpcCall<Params, Resp, Output, Map>
where
Params: RpcParam + 'a,
Resp: RpcReturn,
Output: 'static,
Output: 'a,
Map: FnOnce(Resp) -> Output + Send + 'a,
{
/// Convert this future into a boxed, pinned future, erasing its type.
pub fn boxed(self) -> RpcFut<'a, Output> {
Box::pin(self)
self.into_future()
}

async fn do_call(self) -> TransportResult<Output> {
let Self { request, mut connection, map, _pd: PhantomData } = self;
std::future::poll_fn(|cx| connection.poll_ready(cx)).await?;
let serialized_request = request.serialize().map_err(TransportError::ser_err)?;
let response_packet = connection.call(serialized_request.into()).await?;
let ResponsePacket::Single(response) = response_packet else {
panic!("received batch response from single request")
};
try_deserialize_ok(transform_response(response)).map(map)
}
}

impl<Params, Resp, Output, Map> Future for RpcCall<Params, Resp, Output, Map>
impl<'a, Params, Resp, Output, Map> IntoFuture for RpcCall<Params, Resp, Output, Map>
where
Params: RpcParam,
Params: RpcParam + 'a,
Resp: RpcReturn,
Output: 'static,
Map: FnOnce(Resp) -> Output,
Output: 'a,
Map: FnOnce(Resp) -> Output + Send + 'a,
{
type Output = TransportResult<Output>;

fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
trace!(?self.state, "polling RpcCall");

let this = self.get_mut();
let resp = try_deserialize_ok(ready!(this.state.poll_unpin(cx)));
type IntoFuture = RpcFut<'a, Output>;
type Output = <Self::IntoFuture as Future>::Output;

Ready(resp.map(this.map.take().expect("polled after completion")))
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.do_call())
}
}

0 comments on commit 16ffd80

Please sign in to comment.