Skip to content

Commit

Permalink
chore: move beacon handle type (#13714)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Jan 7, 2025
1 parent 027f80e commit 818eb7d
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 133 deletions.
37 changes: 1 addition & 36 deletions crates/consensus/beacon/src/engine/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::engine::hooks::EngineHookError;
use alloy_rpc_types_engine::ForkchoiceUpdateError;
pub use reth_engine_primitives::BeaconForkChoiceUpdateError;
use reth_errors::{DatabaseError, RethError};
use reth_stages_api::PipelineError;

Expand Down Expand Up @@ -42,38 +42,3 @@ impl From<DatabaseError> for BeaconConsensusEngineError {
Self::Common(e.into())
}
}

/// Represents error cases for an applied forkchoice update.
///
/// This represents all possible error cases, that must be returned as JSON RPC errors back to the
/// beacon node.
#[derive(Debug, thiserror::Error)]
pub enum BeaconForkChoiceUpdateError {
/// Thrown when a forkchoice update resulted in an error.
#[error("forkchoice update error: {0}")]
ForkchoiceUpdateError(#[from] ForkchoiceUpdateError),
/// Thrown when the engine task is unavailable/stopped.
#[error("beacon consensus engine task stopped")]
EngineUnavailable,
/// An internal error occurred, not necessarily related to the update.
#[error(transparent)]
Internal(Box<dyn core::error::Error + Send + Sync>),
}

impl BeaconForkChoiceUpdateError {
/// Create a new internal error.
pub fn internal<E: core::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self::Internal(Box::new(e))
}
}

impl From<RethError> for BeaconForkChoiceUpdateError {
fn from(e: RethError) -> Self {
Self::internal(e)
}
}
impl From<DatabaseError> for BeaconForkChoiceUpdateError {
fn from(e: DatabaseError) -> Self {
Self::internal(e)
}
}
93 changes: 1 addition & 92 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,3 @@
//! `BeaconConsensusEngine` external API
use crate::BeaconForkChoiceUpdateError;
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use futures::TryFutureExt;
use reth_engine_primitives::{
BeaconEngineMessage, BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes,
OnForkChoiceUpdated,
};
use reth_errors::RethResult;
use tokio::sync::{mpsc::UnboundedSender, oneshot};

/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
/// engine task.
///
/// See also `BeaconConsensusEngine`
#[derive(Debug, Clone)]
pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
}

// === impl BeaconConsensusEngineHandle ===

impl<Engine> BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
Self { to_engine }
}

/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}

/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs, version)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
}

/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
});
rx
}

/// Sends a transition configuration exchange message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
///
/// This only notifies about the exchange. The actual exchange is done by the engine API impl
/// itself.
pub fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
}
pub use reth_engine_primitives::BeaconConsensusEngineHandle;
26 changes: 26 additions & 0 deletions crates/engine/primitives/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use alloy_rpc_types_engine::ForkchoiceUpdateError;

/// Represents all error cases when handling a new payload.
///
/// This represents all possible error cases that must be returned as JSON RCP errors back to the
Expand All @@ -18,3 +20,27 @@ impl BeaconOnNewPayloadError {
Self::Internal(Box::new(e))
}
}

/// Represents error cases for an applied forkchoice update.
///
/// This represents all possible error cases, that must be returned as JSON RPC errors back to the
/// beacon node.
#[derive(Debug, thiserror::Error)]
pub enum BeaconForkChoiceUpdateError {
/// Thrown when a forkchoice update resulted in an error.
#[error("forkchoice update error: {0}")]
ForkchoiceUpdateError(#[from] ForkchoiceUpdateError),
/// Thrown when the engine task is unavailable/stopped.
#[error("beacon consensus engine task stopped")]
EngineUnavailable,
/// An internal error occurred, not necessarily related to the update.
#[error(transparent)]
Internal(Box<dyn core::error::Error + Send + Sync>),
}

impl BeaconForkChoiceUpdateError {
/// Create a new internal error.
pub fn internal<E: core::error::Error + Send + Sync + 'static>(e: E) -> Self {
Self::Internal(Box::new(e))
}
}
4 changes: 2 additions & 2 deletions crates/engine/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use core::fmt;

use alloy_consensus::BlockHeader;
use alloy_rpc_types_engine::{ExecutionPayload, ExecutionPayloadSidecar, PayloadError};
pub use error::BeaconOnNewPayloadError;
pub use error::*;

mod forkchoice;
pub use forkchoice::{ForkchoiceStateHash, ForkchoiceStateTracker, ForkchoiceStatus};

mod message;
pub use message::{BeaconEngineMessage, OnForkChoiceUpdated};
pub use message::*;

mod invalid_block_hook;
pub use invalid_block_hook::InvalidBlockHook;
Expand Down
88 changes: 85 additions & 3 deletions crates/engine/primitives/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use crate::{BeaconOnNewPayloadError, EngineApiMessageVersion, EngineTypes, ForkchoiceStatus};
use crate::{
error::BeaconForkChoiceUpdateError, BeaconOnNewPayloadError, EngineApiMessageVersion,
EngineTypes, ForkchoiceStatus,
};
use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkChoiceUpdateResult, ForkchoiceState,
ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
};
use futures::{future::Either, FutureExt};
use futures::{future::Either, FutureExt, TryFutureExt};
use reth_errors::RethResult;
use reth_payload_builder_primitives::PayloadBuilderError;
use std::{
Expand All @@ -12,7 +15,7 @@ use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio::sync::{mpsc::UnboundedSender, oneshot};

/// Represents the outcome of forkchoice update.
///
Expand Down Expand Up @@ -191,3 +194,82 @@ impl<Engine: EngineTypes> Display for BeaconEngineMessage<Engine> {
}
}
}

/// A clonable sender type that can be used to send engine API messages.
///
/// This type mirrors consensus related functions of the engine API.
#[derive(Debug, Clone)]
pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
}

impl<Engine> BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub const fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
Self { to_engine }
}

/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
sidecar: ExecutionPayloadSidecar,
) -> Result<PayloadStatus, BeaconOnNewPayloadError> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::NewPayload { payload, sidecar, tx });
rx.await.map_err(|_| BeaconOnNewPayloadError::EngineUnavailable)?
}

/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs, version)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await?
.map_err(BeaconForkChoiceUpdateError::internal)?
.await?)
}

/// Sends a forkchoice update message to the beacon consensus engine and returns the receiver to
/// wait for a response.
fn send_fork_choice_updated(
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
});
rx
}

/// Sends a transition configuration exchange message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
///
/// This only notifies about the exchange. The actual exchange is done by the engine API impl
/// itself.
pub fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
}

0 comments on commit 818eb7d

Please sign in to comment.