Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plug ovm codec into ImportOpCommand #12759

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 32 additions & 11 deletions crates/net/downloaders/src/file_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, io, path::Path};
use std::{collections::HashMap, fmt::Debug, io, path::Path};

use alloy_consensus::BlockHeader;
use alloy_eips::BlockHashOrNumber;
Expand All @@ -18,7 +18,7 @@ use reth_primitives_traits::{Block, BlockBody, FullBlock};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tokio_util::codec::{Decoder, FramedRead};
use tracing::{debug, trace, warn};

use super::file_codec::BlockFileCodec;
Expand All @@ -41,7 +41,7 @@ pub const DEFAULT_BYTE_LEN_CHUNK_CHAIN_FILE: u64 = 1_000_000_000;
///
/// This reads the entire file into memory, so it is not suitable for large files.
#[derive(Debug)]
pub struct FileClient<B: Block = reth_primitives::Block> {
pub struct FileClient<B: Block = reth_primitives::Block, C: Decoder<Item = B> = BlockFileCodec<B>> {
/// The buffered headers retrieved when fetching new bodies.
headers: HashMap<BlockNumber, B::Header>,

Expand All @@ -50,6 +50,8 @@ pub struct FileClient<B: Block = reth_primitives::Block> {

/// The buffered bodies retrieved when fetching new headers.
bodies: HashMap<BlockHash, B::Body>,

_codec: std::marker::PhantomData<C>,
}

/// An error that can occur when constructing and using a [`FileClient`].
Expand All @@ -74,7 +76,10 @@ impl From<&'static str> for FileClientError {
}
}

impl<B: FullBlock> FileClient<B> {
impl<B: FullBlock, C> FileClient<B, C>
where
C: Decoder<Item = B, Error = FileClientError> + Default,
{
/// Create a new file client from a file path.
pub async fn new<P: AsRef<Path>>(path: P) -> Result<Self, FileClientError> {
let file = File::open(path).await?;
Expand Down Expand Up @@ -182,7 +187,10 @@ impl<B: FullBlock> FileClient<B> {
}
}

impl<B: FullBlock> FromReader for FileClient<B> {
impl<B: FullBlock, C> FromReader for FileClient<B, C>
where
C: Decoder<Item = B, Error = FileClientError> + Default,
{
type Error = FileClientError;

/// Initialize the [`FileClient`] from bytes that have been read from file.
Expand All @@ -198,8 +206,7 @@ impl<B: FullBlock> FromReader for FileClient<B> {
let mut bodies = HashMap::default();

// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream =
FramedRead::with_capacity(reader, BlockFileCodec::<B>::default(), num_bytes as usize);
let mut stream = FramedRead::with_capacity(reader, C::default(), num_bytes as usize);

trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
Expand Down Expand Up @@ -254,15 +261,23 @@ impl<B: FullBlock> FromReader for FileClient<B> {
trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");

Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
file_client: Self {
headers,
hash_to_number,
bodies,
_codec: std::marker::PhantomData,
},
remaining_bytes,
highest_block: None,
})
}
}
}

impl<B: FullBlock> HeadersClient for FileClient<B> {
impl<B: FullBlock, C> HeadersClient for FileClient<B, C>
where
C: Decoder<Item = B, Error = FileClientError> + Sync + Send + Debug,
{
type Header = B::Header;
type Output = HeadersFut<B::Header>;

Expand Down Expand Up @@ -313,7 +328,10 @@ impl<B: FullBlock> HeadersClient for FileClient<B> {
}
}

impl<B: FullBlock> BodiesClient for FileClient<B> {
impl<B: FullBlock, C> BodiesClient for FileClient<B, C>
where
C: Decoder<Item = B, Error = FileClientError> + Sync + Send + Debug,
{
type Body = B::Body;
type Output = BodiesFut<B::Body>;

Expand All @@ -338,7 +356,10 @@ impl<B: FullBlock> BodiesClient for FileClient<B> {
}
}

impl<B: FullBlock> DownloadClient for FileClient<B> {
impl<B: FullBlock, C> DownloadClient for FileClient<B, C>
where
C: Decoder<Item = B, Error = FileClientError> + Sync + Send + Debug,
{
fn report_bad_message(&self, _peer_id: PeerId) {
warn!("Reported a bad message on a file client, the file may be corrupted or invalid");
// noop
Expand Down
3 changes: 2 additions & 1 deletion crates/net/downloaders/src/file_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use tokio_util::codec::{Decoder, Encoder};
///
/// It's recommended to use [`with_capacity`](tokio_util::codec::FramedRead::with_capacity) to set
/// the capacity of the framed reader to the size of the file.
pub(crate) struct BlockFileCodec<B>(std::marker::PhantomData<B>);
#[derive(Debug)]
pub struct BlockFileCodec<B>(std::marker::PhantomData<B>);

impl<B> Default for BlockFileCodec<B> {
fn default() -> Self {
Expand Down
3 changes: 2 additions & 1 deletion crates/node/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

use core::{fmt::Debug, marker::PhantomData};
pub use reth_primitives_traits::{
Block, BlockBody, FullBlock, FullNodePrimitives, FullReceipt, FullSignedTx, NodePrimitives,
Block, BlockBody, FillTxEnv, FullBlock, FullNodePrimitives, FullReceipt, FullSignedTx,
InMemorySize, NodePrimitives, SignedTransaction,
};

use reth_chainspec::EthChainSpec;
Expand Down
8 changes: 8 additions & 0 deletions crates/optimism/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ reth-execution-types.workspace = true
reth-node-core.workspace = true
reth-optimism-node.workspace = true
reth-primitives.workspace = true
reth-codecs.workspace = true

# so jemalloc metrics can be included
reth-node-metrics.workspace = true
Expand All @@ -52,7 +53,11 @@ alloy-consensus = { workspace = true, optional = true }
alloy-primitives.workspace = true
alloy-rlp.workspace = true

revm-primitives.workspace = true

# misc
bytes.workspace = true
arbitrary = { workspace = true, features = ["derive"] }
futures-util.workspace = true
derive_more = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
Expand All @@ -79,8 +84,11 @@ tempfile.workspace = true
reth-stages = { workspace = true, features = ["test-utils"] }
reth-db-common.workspace = true
reth-cli-commands.workspace = true
reth-codecs = { workspace = true, features = ["test-utils"] }
arbitrary = { workspace = true, features = ["derive"] }

[features]
default = ['optimism']
optimism = [
"op-alloy-consensus",
"alloy-consensus",
Expand Down
6 changes: 4 additions & 2 deletions crates/optimism/cli/src/commands/build_pipeline.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_import_pipeline fails to compile with the error

error[E0271]: type mismatch resolving `<DatabaseProvider<<<N as NodeTypesWithDB>::DB as Database>::TXMut, N> as BlockWriter>::Body == BlockBody`
   --> crates/optimism/cli/src/commands/build_pipeline.rs:95:14
    |
95  |             .builder()
    |              ^^^^^^^ expected `BlockBody`, found a different `BlockBody`
    |
    = note: `BlockBody` and `BlockBody` have similar names, but are actually distinct types
note: `BlockBody` is defined in crate `reth_primitives`
   --> /home/dkathiriya/projects/reth/crates/primitives/src/block.rs:591:1
    |
591 | pub struct BlockBody {
    | ^^^^^^^^^^^^^^^^^^^^
note: `BlockBody` is defined in the current crate
   --> crates/optimism/cli/src/ovm_file_codec.rs:102:1
    |
102 | pub struct BlockBody {
    | ^^^^^^^^^^^^^^^^^^^^
    = note: required for `BodyStage<TaskDownloader<BlockBody>>` to implement `reth_stages::Stage<DatabaseProvider<<<N as NodeTypesWithDB>::DB as reth_db::Database>::TXMut, N>>`
    = note: required for `OnlineStages<ProviderFactory<N>, ..., ...>` to implement `StageSet<DatabaseProvider<<<N as NodeTypesWithDB>::DB as reth_db::Database>::TXMut, N>>`
    = note: 1 redundant requirement hidden
    = note: required for `DefaultStages<ProviderFactory<...>, ..., ..., ...>` to implement `StageSet<DatabaseProvider<<<N as NodeTypesWithDB>::DB as reth_db::Database>::TXMut, N>>`

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume fixing this would require a large refactor of DatabaseProvider?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DatabaseProvider is generic over N: NodeTypes, so should be possible to use with custom N::Primitives, i.e. making a new type

pub struct OvmPrimitives {
    type BlockBody = ovm_file_codec::BlockBody;
    ..
}

and implementing the reth_primitives_traits::BlockBody trait for ovm_file_codec::BlockBody etc. then make a copy of the OpNode called OvmNode, replacing type Primitives = OpPrimitives with type Primitives = OvmPrimitives.

if you could impl these changes now, it's not sure if it will be able to take effect right away, since the complete integration of generic data primitives is pending #12454. however this pr will definitely be merged once it compiles.

Copy link
Contributor Author

@lakshya-sky lakshya-sky Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking maybe move ovm_file_codec::{Block, BlockBody, *} to reth_optimism_primitives and update OpPrimitives itself with the new primitives. That way we won't have to introduce OvmNode and OvmPrimitives?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpNode is EVM since the bedrock hardfork, that's why we are importing the OVM part of the op mainnet chain by this command as opposed to the traditional way: syncing from peers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ use reth_static_file::StaticFileProducer;
use std::sync::Arc;
use tokio::sync::watch;

use crate::ovm_file_codec::{Block, BlockBody, OvmBlockFileCodec};

/// Builds import pipeline.
///
/// If configured to execute, all stages will run. Otherwise, only stages that don't require state
Expand All @@ -34,13 +36,13 @@ pub(crate) async fn build_import_pipeline<N, C>(
config: &Config,
provider_factory: ProviderFactory<N>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
file_client: Arc<FileClient<Block, OvmBlockFileCodec>>,
static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)>
where
N: CliNodeTypes + ProviderNodeTypes<ChainSpec = OpChainSpec>,
C: Consensus + 'static,
C: Consensus<alloy_consensus::Header, BlockBody> + 'static,
{
if !file_client.has_canonical_blocks() {
eyre::bail!("unable to import non canonical blocks");
Expand Down
9 changes: 7 additions & 2 deletions crates/optimism/cli/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc};
use tracing::{debug, error, info};

use crate::commands::build_pipeline::build_import_pipeline;
use crate::{
commands::build_pipeline::build_import_pipeline,
ovm_file_codec::{Block, OvmBlockFileCodec},
};

/// Syncs RLP encoded blocks from a file.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -65,7 +68,9 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> ImportOpCommand<C> {
let mut total_decoded_txns = 0;
let mut total_filtered_out_dup_txns = 0;

while let Some(mut file_client) = reader.next_chunk::<FileClient>().await? {
while let Some(mut file_client) =
reader.next_chunk::<FileClient<Block, OvmBlockFileCodec>>().await?
{
// create a new FileClient from chunk read from file
info!(target: "reth::cli",
"Importing chain file chunk"
Expand Down
Loading
Loading