From 6bf3c97259112e3849af6fc87e0ab40fa7384d3d Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Tue, 21 May 2024 12:49:40 +0200 Subject: [PATCH 01/13] init --- crates/sui-bridge-indexer/Cargo.toml | 32 ++++++++ crates/sui-bridge-indexer/diesel.toml | 9 +++ crates/sui-bridge-indexer/src/config.rs | 47 +++++++++++ crates/sui-bridge-indexer/src/main.rs | 71 ++++++++++++++++ .../2024-05-16-134945_tokens/down.sql | 2 + .../2024-05-16-134945_tokens/up.sql | 10 +++ crates/sui-bridge-indexer/src/mod.rs | 8 ++ crates/sui-bridge-indexer/src/models.rs | 17 ++++ .../sui-bridge-indexer/src/postgres_writer.rs | 31 +++++++ crates/sui-bridge-indexer/src/schema.rs | 15 ++++ crates/sui-bridge-indexer/src/worker.rs | 80 +++++++++++++++++++ 11 files changed, 322 insertions(+) create mode 100644 crates/sui-bridge-indexer/Cargo.toml create mode 100644 crates/sui-bridge-indexer/diesel.toml create mode 100644 crates/sui-bridge-indexer/src/config.rs create mode 100644 crates/sui-bridge-indexer/src/main.rs create mode 100644 crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql create mode 100644 crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql create mode 100644 crates/sui-bridge-indexer/src/mod.rs create mode 100644 crates/sui-bridge-indexer/src/models.rs create mode 100644 crates/sui-bridge-indexer/src/postgres_writer.rs create mode 100644 crates/sui-bridge-indexer/src/schema.rs create mode 100644 crates/sui-bridge-indexer/src/worker.rs diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml new file mode 100644 index 0000000000000..ae1d17f57f310 --- /dev/null +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "sui-bridge-indexer" +version = "0.1.0" +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] } +ethers = "2.0" +tokio = { workspace = true, features = ["full"] } +sui-types.workspace = true +prometheus.workspace = true +async-trait.workspace = true +sui-data-ingestion-core.workspace = true +sui-bridge.workspace = true +clap.workspace = true +tracing.workspace = true +bin-version.workspace = true +anyhow.workspace = true + +[dev-dependencies] +sui-types = { workspace = true, features = ["test-utils"] } +sui-config.workspace = true +sui-test-transaction-builder.workspace = true +test-cluster.workspace = true +hex-literal = "0.3.4" + +[[bin]] +name = "bridge-indexer" +path = "src/main.rs" diff --git a/crates/sui-bridge-indexer/diesel.toml b/crates/sui-bridge-indexer/diesel.toml new file mode 100644 index 0000000000000..d70c5a10f7c45 --- /dev/null +++ b/crates/sui-bridge-indexer/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/indexer/schema.rs" +custom_type_derives = ["diesel::query_builder::QueryId"] + +[migrations_directory] +dir = "src/indexer/migrations" diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs new file mode 100644 index 0000000000000..e5c0ec341face --- /dev/null +++ b/crates/sui-bridge-indexer/src/config.rs @@ -0,0 +1,47 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use clap::*; +use std::path::PathBuf; + +#[derive(Parser, Clone, Debug)] +#[clap( + name = "Bridge Indexer", + about = "Run an indexer for the bridge.\n\ + It uses the data ingestion framework to read sui transactions and listens\n\ + to Ethereum events in order to generate data related to the bridge.\n\ + Data is written to postgres tables and can be used for dashboards and general checks\n\ + on bridge health.", + rename_all = "kebab-case" +)] +pub struct BridgeIndexerConfig { + /// URL of the sui remote store. + #[clap(long, short = 'r', required = true)] + pub remote_store_url: Option, + /// URL for Eth fullnode. + #[clap(long, short = 'e', required = true)] + pub eth_rpc_url: String, + /// URL of the DB instance holding indexed bridge data. + #[clap(long, short = 'd', required = true)] + pub db_url: String, + /// Path to the file where the progress store is stored. + #[clap( + long, + short = 'p', + default_value = "/tmp/progress_store", + global = true + )] + pub progress_store_file: PathBuf, + /// Path to the directory where the checkpoints are stored. + #[clap(long, short = 'c', default_value = "/tmp", global = true)] + pub checkpoints_path: PathBuf, + /// Number of worker threads to use. + #[clap(long, short = 't', default_value = "1", global = true)] + pub concurrency: usize, + /// Address of the SuiBridge contract + #[clap(long, required = true)] + pub eth_sui_bridge_contract_address: String, + /// Block to start indexing from + #[clap(long, required = true)] + pub start_block: u64, +} diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs new file mode 100644 index 0000000000000..f64b9dee8c742 --- /dev/null +++ b/crates/sui-bridge-indexer/src/main.rs @@ -0,0 +1,71 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use clap::Parser; +use ethers::types::Address as EthAddress; +use prometheus::Registry; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use sui_bridge::{ + eth_client::EthClient, + eth_syncer::EthSyncer, + indexer::{config::BridgeIndexerConfig, worker::BridgeWorker}, +}; +use sui_data_ingestion_core::{ + DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, +}; +use tokio::sync::oneshot; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<()> { + let config = BridgeIndexerConfig::parse(); + info!("Parsed config: {:#?}", config); + + // start sui side + let (_exit_sender, exit_receiver) = oneshot::channel(); + let metrics = DataIngestionMetrics::new(&Registry::new()); + let progress_store = FileProgressStore::new(config.progress_store_file); + let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); + let worker_pool = WorkerPool::new( + BridgeWorker::new(vec![], config.db_url), + "bridge worker".into(), + config.concurrency, + ); + executor.register(worker_pool).await?; + executor + .run( + config.checkpoints_path, + config.remote_store_url, + vec![], // optional remote store access options + ReaderOptions::default(), + exit_receiver, + ) + .await?; + + // start eth side + let eth_client = Arc::new( + EthClient::::new( + &config.eth_rpc_url, + HashSet::from_iter(vec![ + // Define in config? + // bridge_proxy_address, + // committee_address, + // config_address, + // limiter_address, + // vault_address, + ]), + ) + .await?, + ); + let contract_addresses: HashMap = HashMap::new(); + let mut all_handles = vec![]; + let (task_handles, _eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) + .run() + .await + .expect("Failed to start eth syncer"); + all_handles.extend(task_handles); + + Ok(()) +} diff --git a/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql b/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql new file mode 100644 index 0000000000000..8f0504ea315cb --- /dev/null +++ b/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS tokens; diff --git a/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql b/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql new file mode 100644 index 0000000000000..682e04232999b --- /dev/null +++ b/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql @@ -0,0 +1,10 @@ +-- Your SQL goes here +CREATE TABLE tokens +( + message_key bytea PRIMARY KEY, + checkpoint bigint NOT NULL, + epoch bigint NOT NULL, + token_type int NOT NULL, + source_chain int NOT NULL, + destination_chain int NOT NULL +); \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/mod.rs b/crates/sui-bridge-indexer/src/mod.rs new file mode 100644 index 0000000000000..c840d78b0a8e7 --- /dev/null +++ b/crates/sui-bridge-indexer/src/mod.rs @@ -0,0 +1,8 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +pub mod config; +pub mod models; +pub mod postgres_writer; +pub mod schema; +pub mod worker; diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs new file mode 100644 index 0000000000000..cc28c6d35d794 --- /dev/null +++ b/crates/sui-bridge-indexer/src/models.rs @@ -0,0 +1,17 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::indexer::schema::tokens; +use diesel::prelude::*; + +#[derive(Queryable, Selectable, Insertable, AsChangeset, Debug)] +#[diesel(table_name = tokens)] +#[diesel(check_for_backend(diesel::pg::Pg))] +pub struct TokenTxn { + pub message_key: Vec, + pub checkpoint: i64, + pub epoch: i64, + pub token_type: i32, + pub source_chain: i32, + pub destination_chain: i32, +} diff --git a/crates/sui-bridge-indexer/src/postgres_writer.rs b/crates/sui-bridge-indexer/src/postgres_writer.rs new file mode 100644 index 0000000000000..573e611ef9a22 --- /dev/null +++ b/crates/sui-bridge-indexer/src/postgres_writer.rs @@ -0,0 +1,31 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::indexer::{models::TokenTxn, schema::tokens}; +use diesel::{ + pg::PgConnection, + r2d2::{ConnectionManager, Pool}, + Connection, RunQueryDsl, +}; + +pub(crate) type PgPool = Pool>; + +pub(crate) fn get_connection_pool(database_url: String) -> PgPool { + let manager = ConnectionManager::::new(database_url); + Pool::builder() + .test_on_check_out(true) + .build(manager) + .expect("Could not build Postgres DB connection pool") +} + +pub(crate) fn write(pool: &PgPool, token: TokenTxn) { + let connection = &mut pool.get().unwrap(); + connection + .transaction(|conn| { + diesel::insert_into(tokens::table) + .values(token) + .on_conflict_do_nothing() + .execute(conn) + }) + .expect("Failed to start connection to DB"); +} diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs new file mode 100644 index 0000000000000..4121f496670c9 --- /dev/null +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -0,0 +1,15 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// @generated automatically by Diesel CLI. + +diesel::table! { + tokens (message_key) { + message_key -> Bytea, + checkpoint -> Int8, + epoch -> Int8, + token_type -> Int4, + source_chain -> Int4, + destination_chain -> Int4, + } +} diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs new file mode 100644 index 0000000000000..d6da80ae42af9 --- /dev/null +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -0,0 +1,80 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::indexer::models::TokenTxn; +use crate::indexer::postgres_writer::{get_connection_pool, write, PgPool}; +use anyhow::Result; +use async_trait::async_trait; +use std::collections::BTreeSet; +use sui_data_ingestion_core::Worker; +use sui_types::{ + base_types::ObjectID, + full_checkpoint_content::{CheckpointData, CheckpointTransaction}, + transaction::{TransactionDataAPI, TransactionKind}, + SUI_BRIDGE_OBJECT_ID, +}; +use tracing::info; + +pub struct BridgeWorker { + bridge_object_ids: BTreeSet, + pg_pool: PgPool, +} + +impl BridgeWorker { + pub fn new(bridge_object_ids: Vec, db_url: String) -> Self { + let mut bridge_object_ids = bridge_object_ids.into_iter().collect::>(); + bridge_object_ids.insert(SUI_BRIDGE_OBJECT_ID); + let pg_pool = get_connection_pool(db_url); + Self { + bridge_object_ids, + pg_pool, + } + } + + // Return true if the transaction relates to the bridge and is of interest. + fn is_bridge_transaction(&self, tx: &CheckpointTransaction) -> bool { + // TODO: right now this returns true for programmable transactions that + // have the bridge object as input. We can extend later to cover other cases + let txn_data = tx.transaction.transaction_data(); + if let TransactionKind::ProgrammableTransaction(_pt) = txn_data.kind() { + return tx + .input_objects + .iter() + .any(|obj| self.bridge_object_ids.contains(&obj.id())); + }; + false + } + + // Process a transaction that has been identified as a bridge transaction. + fn process_transaction(&self, tx: &CheckpointTransaction, epoch: u64, checkpoint: u64) { + let token_txn = TokenTxn { + message_key: tx.transaction.digest().inner().to_vec(), + checkpoint: checkpoint as i64, + epoch: epoch as i64, + token_type: 4, + source_chain: 2, + destination_chain: 3, + }; + write(&self.pg_pool, token_txn); + } +} + +#[async_trait] +impl Worker for BridgeWorker { + async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> { + info!( + "Processing checkpoint [{}] {}: {}", + checkpoint.checkpoint_summary.epoch, + checkpoint.checkpoint_summary.sequence_number, + checkpoint.transactions.len(), + ); + let epoch = checkpoint.checkpoint_summary.epoch; + let checkpoint_num = checkpoint.checkpoint_summary.sequence_number; + checkpoint + .transactions + .iter() + .filter(|txn| self.is_bridge_transaction(txn)) + .for_each(|txn| self.process_transaction(txn, epoch, checkpoint_num)); + Ok(()) + } +} From 445feb777118d814a150c3b02aa78c4cbe8de2fb Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Tue, 21 May 2024 13:08:47 +0200 Subject: [PATCH 02/13] update eth client ref --- crates/sui-bridge-indexer/src/main.rs | 29 +++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index f64b9dee8c742..f4e9ce32d9067 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -44,22 +44,35 @@ async fn main() -> Result<()> { ) .await?; - // start eth side + let sui_bridge = EthSuiBridge::new(bridge_proxy_address, provider.clone()); + let committee_address: EthAddress = sui_bridge.committee().call().await?; + let limiter_address: EthAddress = sui_bridge.limiter().call().await?; + let vault_address: EthAddress = sui_bridge.vault().call().await?; + let committee = EthBridgeCommittee::new(committee_address, provider.clone()); + let config_address: EthAddress = committee.config().call().await?; + + // start eth client let eth_client = Arc::new( EthClient::::new( &config.eth_rpc_url, HashSet::from_iter(vec![ - // Define in config? - // bridge_proxy_address, - // committee_address, - // config_address, - // limiter_address, - // vault_address, + bridge_proxy_address, + committee_address, + config_address, + limiter_address, + vault_address, ]), ) .await?, ); - let contract_addresses: HashMap = HashMap::new(); + let contract_addresses = vec![ + bridge_proxy_address, + committee_address, + config_address, + limiter_address, + vault_address, + ]; + let mut all_handles = vec![]; let (task_handles, _eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) .run() From ece13b98e270d09f649d1e035fdd5de3b6e38c01 Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Tue, 21 May 2024 15:45:01 +0200 Subject: [PATCH 03/13] Fix build --- Cargo.lock | 22 ++++++++++ Cargo.toml | 1 + .../sui-bridge-indexer/src/{mod.rs => lib.rs} | 0 crates/sui-bridge-indexer/src/main.rs | 40 ++++++++++++------- crates/sui-bridge-indexer/src/models.rs | 2 +- .../sui-bridge-indexer/src/postgres_writer.rs | 2 +- crates/sui-bridge-indexer/src/worker.rs | 4 +- 7 files changed, 52 insertions(+), 19 deletions(-) rename crates/sui-bridge-indexer/src/{mod.rs => lib.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index df8d287b72a85..35ae80cffdb5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12038,6 +12038,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "sui-bridge-indexer" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bin-version", + "clap", + "diesel", + "ethers", + "hex-literal 0.3.4", + "prometheus", + "sui-bridge", + "sui-config", + "sui-data-ingestion-core", + "sui-test-transaction-builder", + "sui-types", + "test-cluster", + "tokio", + "tracing", +] + [[package]] name = "sui-cluster-test" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 247d3ba5b003f..1393c3514ea34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ members = [ "crates/sui-benchmark", "crates/sui-bridge", "crates/sui-bridge-cli", + "crates/sui-bridge-indexer", "crates/sui-cluster-test", "crates/sui-common", "crates/sui-config", diff --git a/crates/sui-bridge-indexer/src/mod.rs b/crates/sui-bridge-indexer/src/lib.rs similarity index 100% rename from crates/sui-bridge-indexer/src/mod.rs rename to crates/sui-bridge-indexer/src/lib.rs diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index f4e9ce32d9067..6302dce156205 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -5,13 +5,16 @@ use anyhow::Result; use clap::Parser; use ethers::types::Address as EthAddress; use prometheus::Registry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use std::collections::HashSet; +use std::str::FromStr; use std::sync::Arc; use sui_bridge::{ + abi::{EthBridgeCommittee, EthSuiBridge}, eth_client::EthClient, eth_syncer::EthSyncer, - indexer::{config::BridgeIndexerConfig, worker::BridgeWorker}, }; +use sui_bridge_indexer::{config::BridgeIndexerConfig, worker::BridgeWorker}; use sui_data_ingestion_core::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, }; @@ -43,8 +46,13 @@ async fn main() -> Result<()> { exit_receiver, ) .await?; - - let sui_bridge = EthSuiBridge::new(bridge_proxy_address, provider.clone()); + let provider = Arc::new( + ethers::prelude::Provider::::try_from(&config.eth_rpc_url) + .unwrap() + .interval(std::time::Duration::from_millis(2000)), + ); + let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?; + let sui_bridge = EthSuiBridge::new(bridge_address, provider.clone()); let committee_address: EthAddress = sui_bridge.committee().call().await?; let limiter_address: EthAddress = sui_bridge.limiter().call().await?; let vault_address: EthAddress = sui_bridge.vault().call().await?; @@ -56,7 +64,7 @@ async fn main() -> Result<()> { EthClient::::new( &config.eth_rpc_url, HashSet::from_iter(vec![ - bridge_proxy_address, + bridge_address, committee_address, config_address, limiter_address, @@ -65,20 +73,22 @@ async fn main() -> Result<()> { ) .await?, ); - let contract_addresses = vec![ - bridge_proxy_address, - committee_address, - config_address, - limiter_address, - vault_address, - ]; + let contract_addresses = HashMap::from_iter(vec![ + (bridge_address, config.start_block), + (committee_address, config.start_block), + (config_address, config.start_block), + (limiter_address, config.start_block), + (vault_address, config.start_block), + ]); - let mut all_handles = vec![]; - let (task_handles, _eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) + let (_task_handles, _eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) .run() .await .expect("Failed to start eth syncer"); - all_handles.extend(task_handles); + + // eth_events_rx.recv().await { + // println!("Received eth event"); + // }; Ok(()) } diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index cc28c6d35d794..c55fec422bdce 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::indexer::schema::tokens; +use crate::schema::tokens; use diesel::prelude::*; #[derive(Queryable, Selectable, Insertable, AsChangeset, Debug)] diff --git a/crates/sui-bridge-indexer/src/postgres_writer.rs b/crates/sui-bridge-indexer/src/postgres_writer.rs index 573e611ef9a22..04db4e645330d 100644 --- a/crates/sui-bridge-indexer/src/postgres_writer.rs +++ b/crates/sui-bridge-indexer/src/postgres_writer.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::indexer::{models::TokenTxn, schema::tokens}; +use crate::{models::TokenTxn, schema::tokens}; use diesel::{ pg::PgConnection, r2d2::{ConnectionManager, Pool}, diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index d6da80ae42af9..5251e16448b6f 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -1,8 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::indexer::models::TokenTxn; -use crate::indexer::postgres_writer::{get_connection_pool, write, PgPool}; +use crate::models::TokenTxn; +use crate::postgres_writer::{get_connection_pool, write, PgPool}; use anyhow::Result; use async_trait::async_trait; use std::collections::BTreeSet; From 2b602e83f6cf408bdf2c40c56e46a3a61e08bf6d Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Tue, 21 May 2024 16:30:51 +0200 Subject: [PATCH 04/13] fix eth event listener --- Cargo.lock | 1 + crates/sui-bridge-indexer/Cargo.toml | 1 + crates/sui-bridge-indexer/src/config.rs | 4 +- crates/sui-bridge-indexer/src/main.rs | 68 ++++++++++++++++--------- crates/sui-bridge-indexer/src/worker.rs | 1 + 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35ae80cffdb5d..56b559b3a8ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12049,6 +12049,7 @@ dependencies = [ "diesel", "ethers", "hex-literal 0.3.4", + "mysten-metrics", "prometheus", "sui-bridge", "sui-config", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index ae1d17f57f310..c4147e24c7abf 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -19,6 +19,7 @@ clap.workspace = true tracing.workspace = true bin-version.workspace = true anyhow.workspace = true +mysten-metrics.workspace = true [dev-dependencies] sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs index e5c0ec341face..0f2ab39ac0064 100644 --- a/crates/sui-bridge-indexer/src/config.rs +++ b/crates/sui-bridge-indexer/src/config.rs @@ -39,9 +39,9 @@ pub struct BridgeIndexerConfig { #[clap(long, short = 't', default_value = "1", global = true)] pub concurrency: usize, /// Address of the SuiBridge contract - #[clap(long, required = true)] + #[clap(long, required = true, short = 'a')] pub eth_sui_bridge_contract_address: String, /// Block to start indexing from - #[clap(long, required = true)] + #[clap(long, required = true, short = 's')] pub start_block: u64, } diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 6302dce156205..7bfd637352a94 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -4,9 +4,14 @@ use anyhow::Result; use clap::Parser; use ethers::types::Address as EthAddress; +use mysten_metrics::spawn_logged_monitored_task; +use mysten_metrics::start_prometheus_server; use prometheus::Registry; use std::collections::HashMap; use std::collections::HashSet; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use sui_bridge::{ @@ -14,6 +19,7 @@ use sui_bridge::{ eth_client::EthClient, eth_syncer::EthSyncer, }; + use sui_bridge_indexer::{config::BridgeIndexerConfig, worker::BridgeWorker}; use sui_data_ingestion_core::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, @@ -26,26 +32,18 @@ async fn main() -> Result<()> { let config = BridgeIndexerConfig::parse(); info!("Parsed config: {:#?}", config); - // start sui side + // start metrics server let (_exit_sender, exit_receiver) = oneshot::channel(); let metrics = DataIngestionMetrics::new(&Registry::new()); - let progress_store = FileProgressStore::new(config.progress_store_file); - let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); - let worker_pool = WorkerPool::new( - BridgeWorker::new(vec![], config.db_url), - "bridge worker".into(), - config.concurrency, - ); - executor.register(worker_pool).await?; - executor - .run( - config.checkpoints_path, - config.remote_store_url, - vec![], // optional remote store access options - ReaderOptions::default(), - exit_receiver, - ) - .await?; + + // Init metrics server + let metrics_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1000); + let registry_service = start_prometheus_server(metrics_address); + let prometheus_registry = registry_service.default_registry(); + mysten_metrics::init_metrics(&prometheus_registry); + info!("Metrics server started at port {}", 1000); + + // start eth client let provider = Arc::new( ethers::prelude::Provider::::try_from(&config.eth_rpc_url) .unwrap() @@ -59,7 +57,6 @@ async fn main() -> Result<()> { let committee = EthBridgeCommittee::new(committee_address, provider.clone()); let config_address: EthAddress = committee.config().call().await?; - // start eth client let eth_client = Arc::new( EthClient::::new( &config.eth_rpc_url, @@ -81,14 +78,39 @@ async fn main() -> Result<()> { (vault_address, config.start_block), ]); - let (_task_handles, _eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) + let (_task_handles, mut eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) .run() .await .expect("Failed to start eth syncer"); - // eth_events_rx.recv().await { - // println!("Received eth event"); - // }; + let _task_handle = spawn_logged_monitored_task!( + async move { + while let Some(events) = eth_events_rx.recv().await { + println!("ETH: Received events: {:?}", events); + // TODO: process Eth event + } + }, + "indexer handler" + ); + + // start sui side + let progress_store = FileProgressStore::new(config.progress_store_file); + let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); + let worker_pool = WorkerPool::new( + BridgeWorker::new(vec![], config.db_url), + "bridge worker".into(), + config.concurrency, + ); + executor.register(worker_pool).await?; + executor + .run( + config.checkpoints_path, + config.remote_store_url, + vec![], // optional remote store access options + ReaderOptions::default(), + exit_receiver, + ) + .await?; Ok(()) } diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index 5251e16448b6f..0aa0ceae747f7 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -47,6 +47,7 @@ impl BridgeWorker { // Process a transaction that has been identified as a bridge transaction. fn process_transaction(&self, tx: &CheckpointTransaction, epoch: u64, checkpoint: u64) { + println!("SUI: Processing transaction"); let token_txn = TokenTxn { message_key: tx.transaction.digest().inner().to_vec(), checkpoint: checkpoint as i64, From 10868b481845f87159a0b8ed70bca5a0b410e9ce Mon Sep 17 00:00:00 2001 From: patrick Date: Tue, 21 May 2024 17:32:15 +0200 Subject: [PATCH 05/13] db schema and token transfer data models --- crates/sui-bridge-indexer/diesel.toml | 6 +- crates/sui-bridge-indexer/src/lib.rs | 80 +++++++++++++++++++ .../down.sql | 3 + .../up.sql | 25 ++++++ .../2024-05-16-134945_tokens/down.sql | 2 - .../2024-05-16-134945_tokens/up.sql | 10 --- crates/sui-bridge-indexer/src/models.rs | 34 +++++--- .../sui-bridge-indexer/src/postgres_writer.rs | 17 +++- crates/sui-bridge-indexer/src/schema.rs | 27 +++++-- crates/sui-bridge-indexer/src/worker.rs | 23 +++--- 10 files changed, 181 insertions(+), 46 deletions(-) create mode 100644 crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql create mode 100644 crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql delete mode 100644 crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql delete mode 100644 crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql diff --git a/crates/sui-bridge-indexer/diesel.toml b/crates/sui-bridge-indexer/diesel.toml index d70c5a10f7c45..061dc413ebe1f 100644 --- a/crates/sui-bridge-indexer/diesel.toml +++ b/crates/sui-bridge-indexer/diesel.toml @@ -2,8 +2,8 @@ # see https://diesel.rs/guides/configuring-diesel-cli [print_schema] -file = "src/indexer/schema.rs" -custom_type_derives = ["diesel::query_builder::QueryId"] +file = "src/schema.rs" +#custom_type_derives = ["diesel::query_builder::QueryId"] [migrations_directory] -dir = "src/indexer/migrations" +dir = "src/migrations" diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index c840d78b0a8e7..824f0c012da42 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -1,8 +1,88 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::models::TokenTransfer as DBTokenTransfer; +use crate::models::TokenTransferData as DBTokenTransferData; +use anyhow::anyhow; +use std::fmt::{Display, Formatter}; + pub mod config; pub mod models; pub mod postgres_writer; pub mod schema; pub mod worker; + +pub struct TokenTransfer { + chain_id: u8, + nonce: u64, + block_height: u64, + timestamp_ms: u64, + txn_hash: Vec, + status: TokenTransferStatus, + gas_usage: u64, + data: Option, +} + +pub struct TokenTransferData { + sender_address: Vec, + destination_chain: u8, + recipient_address: Vec, + token_id: u8, + amount: u64, +} + +impl From for DBTokenTransfer { + fn from(value: TokenTransfer) -> Self { + DBTokenTransfer { + chain_id: value.chain_id as i32, + nonce: value.nonce as i64, + block_height: value.block_height as i64, + timestamp_ms: value.timestamp_ms as i64, + txn_hash: value.txn_hash, + status: value.status.to_string(), + gas_usage: value.gas_usage as i64, + } + } +} + +impl TryFrom<&TokenTransfer> for DBTokenTransferData { + type Error = anyhow::Error; + + fn try_from(value: &TokenTransfer) -> Result { + value + .data + .as_ref() + .ok_or(anyhow!( + "Data is empty for TokenTransfer: chain_id = {}, nonce = {}, status = {}", + value.chain_id, + value.nonce, + value.status + )) + .map(|data| DBTokenTransferData { + chain_id: value.chain_id as i32, + nonce: value.nonce as i64, + sender_address: data.sender_address.clone(), + destination_chain: data.destination_chain as i32, + recipient_address: data.recipient_address.clone(), + token_id: data.token_id as i32, + amount: data.amount as i64, + }) + } +} + +enum TokenTransferStatus { + Deposited, + Approved, + Claimed, +} + +impl Display for TokenTransferStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str = match self { + TokenTransferStatus::Deposited => "Deposited", + TokenTransferStatus::Approved => "Approved", + TokenTransferStatus::Claimed => "Claimed", + }; + write!(f, "{str}") + } +} diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000000000..2550cdf407015 --- /dev/null +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS token_transfer; +DROP TABLE IF EXISTS token_transfer_data; diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000000000..13689be26709f --- /dev/null +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,25 @@ +CREATE TABLE token_transfer_data +( + chain_id INT NOT NULL, + nonce BIGINT NOT NULL, + sender_address bytea NOT NULL, + destination_chain INT NOT NULL, + recipient_address bytea NOT NULL, + token_id INT NOT NULL, + amount BIGINT NOT NULL, + PRIMARY KEY(chain_id, nonce) +); +CREATE INDEX token_transfer_data_destination_chain ON token_transfer_data (destination_chain); +CREATE INDEX token_transfer_data_token_id ON token_transfer_data (token_id); + +CREATE TABLE token_transfer +( + chain_id INT NOT NULL, + nonce BIGINT NOT NULL, + block_height BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + txn_hash bytea NOT NULL, + status TEXT NOT NULL, + gas_usage BIGINT NOT NULL, + PRIMARY KEY(chain_id, nonce) +); \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql b/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql deleted file mode 100644 index 8f0504ea315cb..0000000000000 --- a/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/down.sql +++ /dev/null @@ -1,2 +0,0 @@ --- This file should undo anything in `up.sql` -DROP TABLE IF EXISTS tokens; diff --git a/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql b/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql deleted file mode 100644 index 682e04232999b..0000000000000 --- a/crates/sui-bridge-indexer/src/migrations/2024-05-16-134945_tokens/up.sql +++ /dev/null @@ -1,10 +0,0 @@ --- Your SQL goes here -CREATE TABLE tokens -( - message_key bytea PRIMARY KEY, - checkpoint bigint NOT NULL, - epoch bigint NOT NULL, - token_type int NOT NULL, - source_chain int NOT NULL, - destination_chain int NOT NULL -); \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index c55fec422bdce..cbe4a1ee9628d 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -1,17 +1,29 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::schema::tokens; -use diesel::prelude::*; +use crate::schema::{token_transfer, token_transfer_data}; +use diesel::{Identifiable, Insertable, Queryable, Selectable}; -#[derive(Queryable, Selectable, Insertable, AsChangeset, Debug)] -#[diesel(table_name = tokens)] -#[diesel(check_for_backend(diesel::pg::Pg))] -pub struct TokenTxn { - pub message_key: Vec, - pub checkpoint: i64, - pub epoch: i64, - pub token_type: i32, - pub source_chain: i32, +#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] +#[diesel(table_name = token_transfer, primary_key(chain_id, nonce))] +pub struct TokenTransfer { + pub chain_id: i32, + pub nonce: i64, + pub block_height: i64, + pub timestamp_ms: i64, + pub txn_hash: Vec, + pub status: String, + pub gas_usage: i64, +} + +#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] +#[diesel(table_name = token_transfer_data, primary_key(chain_id, nonce))] +pub struct TokenTransferData { + pub chain_id: i32, + pub nonce: i64, + pub sender_address: Vec, pub destination_chain: i32, + pub recipient_address: Vec, + pub token_id: i32, + pub amount: i64, } diff --git a/crates/sui-bridge-indexer/src/postgres_writer.rs b/crates/sui-bridge-indexer/src/postgres_writer.rs index 04db4e645330d..068105072cc69 100644 --- a/crates/sui-bridge-indexer/src/postgres_writer.rs +++ b/crates/sui-bridge-indexer/src/postgres_writer.rs @@ -1,7 +1,10 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{models::TokenTxn, schema::tokens}; +use crate::models::TokenTransfer as DBTokenTransfer; +use crate::models::TokenTransferData as DBTokenTransferData; +use crate::schema::token_transfer_data; +use crate::{schema::token_transfer, TokenTransfer}; use diesel::{ pg::PgConnection, r2d2::{ConnectionManager, Pool}, @@ -18,12 +21,18 @@ pub(crate) fn get_connection_pool(database_url: String) -> PgPool { .expect("Could not build Postgres DB connection pool") } -pub(crate) fn write(pool: &PgPool, token: TokenTxn) { +pub(crate) fn write(pool: &PgPool, token: TokenTransfer) { let connection = &mut pool.get().unwrap(); connection .transaction(|conn| { - diesel::insert_into(tokens::table) - .values(token) + if let Ok(data) = DBTokenTransferData::try_from(&token) { + diesel::insert_into(token_transfer_data::table) + .values(data) + .on_conflict_do_nothing() + .execute(conn)?; + }; + diesel::insert_into(token_transfer::table) + .values(DBTokenTransfer::from(token)) .on_conflict_do_nothing() .execute(conn) }) diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index 4121f496670c9..8c0be8e05430d 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -4,12 +4,27 @@ // @generated automatically by Diesel CLI. diesel::table! { - tokens (message_key) { - message_key -> Bytea, - checkpoint -> Int8, - epoch -> Int8, - token_type -> Int4, - source_chain -> Int4, + token_transfer (chain_id, nonce) { + chain_id -> Int4, + nonce -> Int8, + block_height -> Int8, + timestamp_ms -> Int8, + txn_hash -> Bytea, + status -> Text, + gas_usage -> Int8, + } +} + +diesel::table! { + token_transfer_data (chain_id, nonce) { + chain_id -> Int4, + nonce -> Int8, + sender_address -> Bytea, destination_chain -> Int4, + recipient_address -> Bytea, + token_id -> Int4, + amount -> Int8, } } + +diesel::allow_tables_to_appear_in_same_query!(token_transfer, token_transfer_data,); diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index 0aa0ceae747f7..068b5cb8f6942 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -1,8 +1,8 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::models::TokenTxn; use crate::postgres_writer::{get_connection_pool, write, PgPool}; +use crate::{TokenTransfer, TokenTransferStatus}; use anyhow::Result; use async_trait::async_trait; use std::collections::BTreeSet; @@ -46,17 +46,20 @@ impl BridgeWorker { } // Process a transaction that has been identified as a bridge transaction. - fn process_transaction(&self, tx: &CheckpointTransaction, epoch: u64, checkpoint: u64) { + fn process_transaction(&self, _tx: &CheckpointTransaction, _epoch: u64, _checkpoint: u64) { + // todo create TokenTransfer from checkpoint data println!("SUI: Processing transaction"); - let token_txn = TokenTxn { - message_key: tx.transaction.digest().inner().to_vec(), - checkpoint: checkpoint as i64, - epoch: epoch as i64, - token_type: 4, - source_chain: 2, - destination_chain: 3, + let transfer = TokenTransfer { + chain_id: 0, + nonce: 0, + block_height: 0, + timestamp_ms: Default::default(), + txn_hash: vec![], + status: TokenTransferStatus::Deposited, + gas_usage: 0, + data: None, }; - write(&self.pg_pool, token_txn); + write(&self.pg_pool, transfer); } } From c3108af485f0b1c3b5d17e5cda0d9b899d09d499 Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Tue, 21 May 2024 17:55:53 +0200 Subject: [PATCH 06/13] wip --- crates/sui-bridge-indexer/src/main.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 7bfd637352a94..2f218b3fd0a31 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -85,9 +85,22 @@ async fn main() -> Result<()> { let _task_handle = spawn_logged_monitored_task!( async move { - while let Some(events) = eth_events_rx.recv().await { - println!("ETH: Received events: {:?}", events); - // TODO: process Eth event + while let Some(event) = eth_events_rx.recv().await { + let func_sig_hash = event + .2 + .first() + .and_then(|event_topic| event_topic.log.topics.first()); + + let address = event.0; + + if func_sig_hash.is_none() || address != bridge_address { + continue; + } + + // TODO: check if the func_sig_hash is the "TokensDeposited" or "TokensClaimed" events + + println!("ETH: Received events: {:?}", func_sig_hash); + println!("Event received from: {:?}", address); } }, "indexer handler" From 9f894fa470203128cbdb55b3522bd2cf3eec9f2b Mon Sep 17 00:00:00 2001 From: patrick Date: Tue, 21 May 2024 18:40:18 +0200 Subject: [PATCH 07/13] process SUI bridge tx events --- Cargo.lock | 1 + crates/sui-bridge-indexer/Cargo.toml | 1 + crates/sui-bridge-indexer/src/lib.rs | 4 +- .../up.sql | 4 +- crates/sui-bridge-indexer/src/models.rs | 2 +- crates/sui-bridge-indexer/src/schema.rs | 4 +- crates/sui-bridge-indexer/src/worker.rs | 91 +++++++++++++++---- 7 files changed, 82 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 56b559b3a8ecc..9f8060804f3a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12044,6 +12044,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "bcs", "bin-version", "clap", "diesel", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index c4147e24c7abf..403313476d567 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -20,6 +20,7 @@ tracing.workspace = true bin-version.workspace = true anyhow.workspace = true mysten-metrics.workspace = true +bcs.workspace = true [dev-dependencies] sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 824f0c012da42..8a180d5ab0948 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -19,7 +19,7 @@ pub struct TokenTransfer { timestamp_ms: u64, txn_hash: Vec, status: TokenTransferStatus, - gas_usage: u64, + gas_usage: i64, data: Option, } @@ -40,7 +40,7 @@ impl From for DBTokenTransfer { timestamp_ms: value.timestamp_ms as i64, txn_hash: value.txn_hash, status: value.status.to_string(), - gas_usage: value.gas_usage as i64, + gas_usage: value.gas_usage, } } } diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql index 13689be26709f..8b4705b439b79 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -16,10 +16,10 @@ CREATE TABLE token_transfer ( chain_id INT NOT NULL, nonce BIGINT NOT NULL, + status TEXT NOT NULL, block_height BIGINT NOT NULL, timestamp_ms BIGINT NOT NULL, txn_hash bytea NOT NULL, - status TEXT NOT NULL, gas_usage BIGINT NOT NULL, - PRIMARY KEY(chain_id, nonce) + PRIMARY KEY(chain_id, nonce, status) ); \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index cbe4a1ee9628d..09e4949fe3688 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -9,10 +9,10 @@ use diesel::{Identifiable, Insertable, Queryable, Selectable}; pub struct TokenTransfer { pub chain_id: i32, pub nonce: i64, + pub status: String, pub block_height: i64, pub timestamp_ms: i64, pub txn_hash: Vec, - pub status: String, pub gas_usage: i64, } diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index 8c0be8e05430d..f9b31f003822a 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -4,13 +4,13 @@ // @generated automatically by Diesel CLI. diesel::table! { - token_transfer (chain_id, nonce) { + token_transfer (chain_id, nonce, status) { chain_id -> Int4, nonce -> Int8, + status -> Text, block_height -> Int8, timestamp_ms -> Int8, txn_hash -> Bytea, - status -> Text, gas_usage -> Int8, } } diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index 068b5cb8f6942..cce785822d9dd 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -2,16 +2,20 @@ // SPDX-License-Identifier: Apache-2.0 use crate::postgres_writer::{get_connection_pool, write, PgPool}; -use crate::{TokenTransfer, TokenTransferStatus}; +use crate::{TokenTransfer, TokenTransferData, TokenTransferStatus}; use anyhow::Result; use async_trait::async_trait; use std::collections::BTreeSet; +use sui_bridge::events::{ + MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, +}; use sui_data_ingestion_core::Worker; +use sui_types::effects::TransactionEffectsAPI; use sui_types::{ base_types::ObjectID, full_checkpoint_content::{CheckpointData, CheckpointTransaction}, transaction::{TransactionDataAPI, TransactionKind}, - SUI_BRIDGE_OBJECT_ID, + BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID, }; use tracing::info; @@ -46,20 +50,71 @@ impl BridgeWorker { } // Process a transaction that has been identified as a bridge transaction. - fn process_transaction(&self, _tx: &CheckpointTransaction, _epoch: u64, _checkpoint: u64) { - // todo create TokenTransfer from checkpoint data - println!("SUI: Processing transaction"); - let transfer = TokenTransfer { - chain_id: 0, - nonce: 0, - block_height: 0, - timestamp_ms: Default::default(), - txn_hash: vec![], - status: TokenTransferStatus::Deposited, - gas_usage: 0, - data: None, - }; - write(&self.pg_pool, transfer); + fn process_transaction(&self, tx: &CheckpointTransaction, checkpoint: u64, timestamp_ms: u64) { + if let Some(event) = &tx.events { + event.data.iter().for_each(|ev| { + if ev.type_.address == BRIDGE_ADDRESS { + println!("SUI: Processing bridge event : {:?}", ev.type_); + let token_transfer = match ev.type_.name.as_str() { + "TokenDepositedEvent" => { + // todo: handle deserialization error + let event: MoveTokenDepositedEvent = + bcs::from_bytes(&ev.contents).unwrap(); + Some(TokenTransfer { + chain_id: event.source_chain, + nonce: event.seq_num, + block_height: checkpoint, + timestamp_ms, + txn_hash: tx.transaction.digest().inner().to_vec(), + status: TokenTransferStatus::Deposited, + gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data: Some(TokenTransferData { + sender_address: event.sender_address, + destination_chain: event.target_chain, + recipient_address: event.target_address, + token_id: event.token_type, + amount: event.amount_sui_adjusted, + }), + }) + } + "TokenTransferApproved" => { + let event: MoveTokenTransferApproved = + bcs::from_bytes(&ev.contents).unwrap(); + Some(TokenTransfer { + chain_id: event.message_key.source_chain, + nonce: event.message_key.bridge_seq_num, + block_height: checkpoint, + timestamp_ms, + txn_hash: tx.transaction.digest().inner().to_vec(), + status: TokenTransferStatus::Approved, + gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data: None, + }) + } + "TokenTransferClaimed" => { + let event: MoveTokenTransferClaimed = + bcs::from_bytes(&ev.contents).unwrap(); + Some(TokenTransfer { + chain_id: event.message_key.source_chain, + nonce: event.message_key.bridge_seq_num, + block_height: checkpoint, + timestamp_ms, + txn_hash: tx.transaction.digest().inner().to_vec(), + status: TokenTransferStatus::Claimed, + gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data: None, + }) + } + _ => None, + }; + + if let Some(transfer) = token_transfer { + println!("SUI: Storing bridge event : {:?}", ev.type_); + write(&self.pg_pool, transfer); + } + }; + }); + } } } @@ -72,13 +127,13 @@ impl Worker for BridgeWorker { checkpoint.checkpoint_summary.sequence_number, checkpoint.transactions.len(), ); - let epoch = checkpoint.checkpoint_summary.epoch; let checkpoint_num = checkpoint.checkpoint_summary.sequence_number; + let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms; checkpoint .transactions .iter() .filter(|txn| self.is_bridge_transaction(txn)) - .for_each(|txn| self.process_transaction(txn, epoch, checkpoint_num)); + .for_each(|txn| self.process_transaction(txn, checkpoint_num, timestamp_ms)); Ok(()) } } From bb02f197aff3aeed7488e57012f9b0c4c31af46a Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Tue, 21 May 2024 19:34:08 +0200 Subject: [PATCH 08/13] Update main.rs --- crates/sui-bridge-indexer/src/main.rs | 48 +++++++++++++++++---------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 2f218b3fd0a31..87f481727a784 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -14,6 +14,8 @@ use std::net::Ipv4Addr; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; +use sui_bridge::abi::EthBridgeEvent; +use sui_bridge::abi::EthSuiBridgeEvents; use sui_bridge::{ abi::{EthBridgeCommittee, EthSuiBridge}, eth_client::EthClient, @@ -70,13 +72,7 @@ async fn main() -> Result<()> { ) .await?, ); - let contract_addresses = HashMap::from_iter(vec![ - (bridge_address, config.start_block), - (committee_address, config.start_block), - (config_address, config.start_block), - (limiter_address, config.start_block), - (vault_address, config.start_block), - ]); + let contract_addresses = HashMap::from_iter(vec![(bridge_address, config.start_block)]); let (_task_handles, mut eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) .run() @@ -86,21 +82,37 @@ async fn main() -> Result<()> { let _task_handle = spawn_logged_monitored_task!( async move { while let Some(event) = eth_events_rx.recv().await { - let func_sig_hash = event + let bridge_events = event .2 - .first() - .and_then(|event_topic| event_topic.log.topics.first()); + .iter() + .map(EthBridgeEvent::try_from_eth_log) + .collect::>(); - let address = event.0; + for (log, opt_bridge_event) in event.2.iter().zip(bridge_events) { + if opt_bridge_event.is_none() { + // TODO: we probably should not miss any events, warn for now. + // warn!("Eth event not recognized: {:?}", log); + continue; + } + // Unwrap safe: checked above + let bridge_event = opt_bridge_event.unwrap(); + // println!("Observed Eth bridge event: {:#?}", bridge_event); - if func_sig_hash.is_none() || address != bridge_address { - continue; + match bridge_event { + EthBridgeEvent::EthSuiBridgeEvents(bridge_event) => match bridge_event { + EthSuiBridgeEvents::TokensDepositedFilter(bridge_event) => { + println!("TokensDeposited: {:#?}", bridge_event) + } + EthSuiBridgeEvents::TokensClaimedFilter(bridge_event) => { + println!("TokensClaimed: {:#?}", bridge_event) + } + EthSuiBridgeEvents::PausedFilter(_bridge_event) + | EthSuiBridgeEvents::UnpausedFilter(_bridge_event) + | EthSuiBridgeEvents::UpgradedFilter(_bridge_event) + | EthSuiBridgeEvents::InitializedFilter(_bridge_event) => (), + }, + } } - - // TODO: check if the func_sig_hash is the "TokensDeposited" or "TokensClaimed" events - - println!("ETH: Received events: {:?}", func_sig_hash); - println!("Event received from: {:?}", address); } }, "indexer handler" From ed227820e14d5933bb5a943f9daae765d08d484e Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Wed, 22 May 2024 13:29:30 +0200 Subject: [PATCH 09/13] WIP --- crates/sui-bridge-indexer/src/lib.rs | 2 +- crates/sui-bridge-indexer/src/main.rs | 49 ++--------- .../sui-bridge-indexer/src/postgres_writer.rs | 4 +- crates/sui-bridge-indexer/src/worker.rs | 83 ++++++++++++++++++- 4 files changed, 94 insertions(+), 44 deletions(-) diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 8a180d5ab0948..6fb95e6887ad4 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -70,7 +70,7 @@ impl TryFrom<&TokenTransfer> for DBTokenTransferData { } } -enum TokenTransferStatus { +pub(crate) enum TokenTransferStatus { Deposited, Approved, Claimed, diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 87f481727a784..e6f83eb9e0d73 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -14,15 +14,15 @@ use std::net::Ipv4Addr; use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; -use sui_bridge::abi::EthBridgeEvent; -use sui_bridge::abi::EthSuiBridgeEvents; use sui_bridge::{ abi::{EthBridgeCommittee, EthSuiBridge}, eth_client::EthClient, eth_syncer::EthSyncer, }; - -use sui_bridge_indexer::{config::BridgeIndexerConfig, worker::BridgeWorker}; +use sui_bridge_indexer::postgres_writer::get_connection_pool; +use sui_bridge_indexer::{ + config::BridgeIndexerConfig, worker::process_eth_transaction, worker::BridgeWorker, +}; use sui_data_ingestion_core::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, }; @@ -72,6 +72,7 @@ async fn main() -> Result<()> { ) .await?, ); + let contract_addresses = HashMap::from_iter(vec![(bridge_address, config.start_block)]); let (_task_handles, mut eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) @@ -79,42 +80,10 @@ async fn main() -> Result<()> { .await .expect("Failed to start eth syncer"); - let _task_handle = spawn_logged_monitored_task!( - async move { - while let Some(event) = eth_events_rx.recv().await { - let bridge_events = event - .2 - .iter() - .map(EthBridgeEvent::try_from_eth_log) - .collect::>(); - - for (log, opt_bridge_event) in event.2.iter().zip(bridge_events) { - if opt_bridge_event.is_none() { - // TODO: we probably should not miss any events, warn for now. - // warn!("Eth event not recognized: {:?}", log); - continue; - } - // Unwrap safe: checked above - let bridge_event = opt_bridge_event.unwrap(); - // println!("Observed Eth bridge event: {:#?}", bridge_event); + let pg_pool = get_connection_pool(config.db_url.clone()); - match bridge_event { - EthBridgeEvent::EthSuiBridgeEvents(bridge_event) => match bridge_event { - EthSuiBridgeEvents::TokensDepositedFilter(bridge_event) => { - println!("TokensDeposited: {:#?}", bridge_event) - } - EthSuiBridgeEvents::TokensClaimedFilter(bridge_event) => { - println!("TokensClaimed: {:#?}", bridge_event) - } - EthSuiBridgeEvents::PausedFilter(_bridge_event) - | EthSuiBridgeEvents::UnpausedFilter(_bridge_event) - | EthSuiBridgeEvents::UpgradedFilter(_bridge_event) - | EthSuiBridgeEvents::InitializedFilter(_bridge_event) => (), - }, - } - } - } - }, + let _task_handle = spawn_logged_monitored_task!( + process_eth_transaction(eth_events_rx, provider.clone(), pg_pool), "indexer handler" ); @@ -122,7 +91,7 @@ async fn main() -> Result<()> { let progress_store = FileProgressStore::new(config.progress_store_file); let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); let worker_pool = WorkerPool::new( - BridgeWorker::new(vec![], config.db_url), + BridgeWorker::new(vec![], config.db_url.clone()), "bridge worker".into(), config.concurrency, ); diff --git a/crates/sui-bridge-indexer/src/postgres_writer.rs b/crates/sui-bridge-indexer/src/postgres_writer.rs index 068105072cc69..22e8c033ec295 100644 --- a/crates/sui-bridge-indexer/src/postgres_writer.rs +++ b/crates/sui-bridge-indexer/src/postgres_writer.rs @@ -13,7 +13,7 @@ use diesel::{ pub(crate) type PgPool = Pool>; -pub(crate) fn get_connection_pool(database_url: String) -> PgPool { +pub fn get_connection_pool(database_url: String) -> PgPool { let manager = ConnectionManager::::new(database_url); Pool::builder() .test_on_check_out(true) @@ -21,7 +21,7 @@ pub(crate) fn get_connection_pool(database_url: String) -> PgPool { .expect("Could not build Postgres DB connection pool") } -pub(crate) fn write(pool: &PgPool, token: TokenTransfer) { +pub fn write(pool: &PgPool, token: TokenTransfer) { let connection = &mut pool.get().unwrap(); connection .transaction(|conn| { diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index cce785822d9dd..a7b28c2bc1dd6 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -5,10 +5,16 @@ use crate::postgres_writer::{get_connection_pool, write, PgPool}; use crate::{TokenTransfer, TokenTransferData, TokenTransferStatus}; use anyhow::Result; use async_trait::async_trait; +use ethers::providers::Provider; +use ethers::providers::{Http, Middleware}; +use ethers::types::Address as EthAddress; use std::collections::BTreeSet; +use std::sync::Arc; +use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents}; use sui_bridge::events::{ MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, }; +use sui_bridge::types::EthLog; use sui_data_ingestion_core::Worker; use sui_types::effects::TransactionEffectsAPI; use sui_types::{ @@ -54,9 +60,9 @@ impl BridgeWorker { if let Some(event) = &tx.events { event.data.iter().for_each(|ev| { if ev.type_.address == BRIDGE_ADDRESS { - println!("SUI: Processing bridge event : {:?}", ev.type_); let token_transfer = match ev.type_.name.as_str() { "TokenDepositedEvent" => { + println!("Observed Sui Deposit"); // todo: handle deserialization error let event: MoveTokenDepositedEvent = bcs::from_bytes(&ev.contents).unwrap(); @@ -78,6 +84,7 @@ impl BridgeWorker { }) } "TokenTransferApproved" => { + println!("Observed Sui Approval"); let event: MoveTokenTransferApproved = bcs::from_bytes(&ev.contents).unwrap(); Some(TokenTransfer { @@ -92,6 +99,7 @@ impl BridgeWorker { }) } "TokenTransferClaimed" => { + println!("Observed Sui Claim"); let event: MoveTokenTransferClaimed = bcs::from_bytes(&ev.contents).unwrap(); Some(TokenTransfer { @@ -118,6 +126,79 @@ impl BridgeWorker { } } +pub async fn process_eth_transaction( + mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, + provider: Arc>, + pool: PgPool, +) { + while let Some(event) = eth_events_rx.recv().await { + for log in event.2.iter() { + let eth_bridge_event = EthBridgeEvent::try_from_eth_log(log); + if eth_bridge_event.is_none() { + continue; + } + let bridge_event = eth_bridge_event.unwrap(); + let block_number = log.block_number; + let block = provider.get_block(log.block_number).await.unwrap().unwrap(); + let timestamp = block.timestamp.as_u64() * 1000; + let transaction = provider + .get_transaction(log.tx_hash) + .await + .unwrap() + .unwrap(); + let gas = transaction.gas; + let tx_hash = log.tx_hash; + + println!("Observed Eth bridge event: {:#?}", bridge_event); + + match bridge_event { + EthBridgeEvent::EthSuiBridgeEvents(bridge_event) => match bridge_event { + EthSuiBridgeEvents::TokensDepositedFilter(bridge_event) => { + println!("Observed Eth Deposit"); + let transfer = TokenTransfer { + chain_id: bridge_event.source_chain_id, + nonce: bridge_event.nonce, + block_height: block_number, + timestamp_ms: timestamp, + txn_hash: tx_hash.as_bytes().to_vec(), + status: TokenTransferStatus::Deposited, + gas_usage: gas.as_u64() as i64, + data: Some(TokenTransferData { + sender_address: bridge_event.sender_address.as_bytes().to_vec(), + destination_chain: bridge_event.destination_chain_id, + recipient_address: bridge_event.recipient_address.to_vec(), + token_id: bridge_event.token_id, + amount: bridge_event.sui_adjusted_amount, + }), + }; + + write(&pool, transfer); + } + EthSuiBridgeEvents::TokensClaimedFilter(bridge_event) => { + println!("Observed Eth Claim"); + let transfer = TokenTransfer { + chain_id: bridge_event.source_chain_id, + nonce: bridge_event.nonce, + block_height: block_number, + timestamp_ms: timestamp, + txn_hash: tx_hash.as_bytes().to_vec(), + status: TokenTransferStatus::Claimed, + gas_usage: gas.as_u64() as i64, + data: None, + }; + + write(&pool, transfer); + } + EthSuiBridgeEvents::PausedFilter(_bridge_event) => (), + EthSuiBridgeEvents::UnpausedFilter(_bridge_event) => (), + EthSuiBridgeEvents::UpgradedFilter(_bridge_event) => (), + EthSuiBridgeEvents::InitializedFilter(_bridge_event) => (), + }, + } + } + } +} + #[async_trait] impl Worker for BridgeWorker { async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> { From 0a3c4691c874d7f981ba50d95fb068154d4e5f76 Mon Sep 17 00:00:00 2001 From: patrick Date: Wed, 22 May 2024 15:30:46 +0200 Subject: [PATCH 10/13] add data source column --- crates/sui-bridge-indexer/src/lib.rs | 17 +++++++++++++++++ crates/sui-bridge-indexer/src/main.rs | 2 +- .../00000000000000_diesel_initial_setup/up.sql | 1 + crates/sui-bridge-indexer/src/models.rs | 1 + crates/sui-bridge-indexer/src/schema.rs | 1 + crates/sui-bridge-indexer/src/worker.rs | 15 ++++++++++----- 6 files changed, 31 insertions(+), 6 deletions(-) diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index 6fb95e6887ad4..bcd9beaedf67a 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -20,6 +20,7 @@ pub struct TokenTransfer { txn_hash: Vec, status: TokenTransferStatus, gas_usage: i64, + data_source: BridgeDataSource, data: Option, } @@ -41,6 +42,7 @@ impl From for DBTokenTransfer { txn_hash: value.txn_hash, status: value.status.to_string(), gas_usage: value.gas_usage, + data_source: value.data_source.to_string(), } } } @@ -86,3 +88,18 @@ impl Display for TokenTransferStatus { write!(f, "{str}") } } + +enum BridgeDataSource { + SUI, + ETH, +} + +impl Display for BridgeDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str = match self { + BridgeDataSource::ETH => "ETH", + BridgeDataSource::SUI => "SUI", + }; + write!(f, "{str}") + } +} diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index e6f83eb9e0d73..09210840ffb56 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -75,7 +75,7 @@ async fn main() -> Result<()> { let contract_addresses = HashMap::from_iter(vec![(bridge_address, config.start_block)]); - let (_task_handles, mut eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) + let (_task_handles, eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) .run() .await .expect("Failed to start eth syncer"); diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql index 8b4705b439b79..fffdaa5e6234b 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -21,5 +21,6 @@ CREATE TABLE token_transfer timestamp_ms BIGINT NOT NULL, txn_hash bytea NOT NULL, gas_usage BIGINT NOT NULL, + data_source TEXT NOT NULL, PRIMARY KEY(chain_id, nonce, status) ); \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index 09e4949fe3688..ee0bcc88ab646 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -14,6 +14,7 @@ pub struct TokenTransfer { pub timestamp_ms: i64, pub txn_hash: Vec, pub gas_usage: i64, + pub data_source: String, } #[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index f9b31f003822a..e13e095c22654 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -12,6 +12,7 @@ diesel::table! { timestamp_ms -> Int8, txn_hash -> Bytea, gas_usage -> Int8, + data_source -> Text, } } diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index a7b28c2bc1dd6..d84d3a473be99 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::postgres_writer::{get_connection_pool, write, PgPool}; -use crate::{TokenTransfer, TokenTransferData, TokenTransferStatus}; +use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus}; use anyhow::Result; use async_trait::async_trait; use ethers::providers::Provider; @@ -74,6 +74,7 @@ impl BridgeWorker { txn_hash: tx.transaction.digest().inner().to_vec(), status: TokenTransferStatus::Deposited, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::SUI, data: Some(TokenTransferData { sender_address: event.sender_address, destination_chain: event.target_chain, @@ -95,6 +96,7 @@ impl BridgeWorker { txn_hash: tx.transaction.digest().inner().to_vec(), status: TokenTransferStatus::Approved, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::SUI, data: None, }) } @@ -110,6 +112,7 @@ impl BridgeWorker { txn_hash: tx.transaction.digest().inner().to_vec(), status: TokenTransferStatus::Claimed, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::SUI, data: None, }) } @@ -163,6 +166,7 @@ pub async fn process_eth_transaction( txn_hash: tx_hash.as_bytes().to_vec(), status: TokenTransferStatus::Deposited, gas_usage: gas.as_u64() as i64, + data_source: BridgeDataSource::ETH, data: Some(TokenTransferData { sender_address: bridge_event.sender_address.as_bytes().to_vec(), destination_chain: bridge_event.destination_chain_id, @@ -184,15 +188,16 @@ pub async fn process_eth_transaction( txn_hash: tx_hash.as_bytes().to_vec(), status: TokenTransferStatus::Claimed, gas_usage: gas.as_u64() as i64, + data_source: BridgeDataSource::ETH, data: None, }; write(&pool, transfer); } - EthSuiBridgeEvents::PausedFilter(_bridge_event) => (), - EthSuiBridgeEvents::UnpausedFilter(_bridge_event) => (), - EthSuiBridgeEvents::UpgradedFilter(_bridge_event) => (), - EthSuiBridgeEvents::InitializedFilter(_bridge_event) => (), + EthSuiBridgeEvents::PausedFilter(_) + | EthSuiBridgeEvents::UnpausedFilter(_) + | EthSuiBridgeEvents::UpgradedFilter(_) + | EthSuiBridgeEvents::InitializedFilter(_) => (), }, } } From c362d1a9a943d0c6d23e1de9551e0bcacf558416 Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Thu, 23 May 2024 14:34:55 +0200 Subject: [PATCH 11/13] Add config file --- Cargo.lock | 2 + crates/sui-bridge-indexer/Cargo.toml | 2 + crates/sui-bridge-indexer/src/config.rs | 57 +++++++---------------- crates/sui-bridge-indexer/src/config.yaml | 18 +++++++ crates/sui-bridge-indexer/src/main.rs | 34 ++++++++++---- 5 files changed, 66 insertions(+), 47 deletions(-) create mode 100644 crates/sui-bridge-indexer/src/config.yaml diff --git a/Cargo.lock b/Cargo.lock index 9f8060804f3a1..137bdd8153937 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12052,6 +12052,8 @@ dependencies = [ "hex-literal 0.3.4", "mysten-metrics", "prometheus", + "serde", + "serde_yaml 0.8.26", "sui-bridge", "sui-config", "sui-data-ingestion-core", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml index 403313476d567..66833b7044eea 100644 --- a/crates/sui-bridge-indexer/Cargo.toml +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -7,6 +7,7 @@ publish = false edition = "2021" [dependencies] +serde.workspace = true diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] } ethers = "2.0" tokio = { workspace = true, features = ["full"] } @@ -21,6 +22,7 @@ bin-version.workspace = true anyhow.workspace = true mysten-metrics.workspace = true bcs.workspace = true +serde_yaml.workspace = true [dev-dependencies] sui-types = { workspace = true, features = ["test-utils"] } diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs index 0f2ab39ac0064..cbfcf3e76a97b 100644 --- a/crates/sui-bridge-indexer/src/config.rs +++ b/crates/sui-bridge-indexer/src/config.rs @@ -1,47 +1,26 @@ +use anyhow::Result; +use std::{fs, path::Path}; + // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use serde::Deserialize; -use clap::*; -use std::path::PathBuf; - -#[derive(Parser, Clone, Debug)] -#[clap( - name = "Bridge Indexer", - about = "Run an indexer for the bridge.\n\ - It uses the data ingestion framework to read sui transactions and listens\n\ - to Ethereum events in order to generate data related to the bridge.\n\ - Data is written to postgres tables and can be used for dashboards and general checks\n\ - on bridge health.", - rename_all = "kebab-case" -)] -pub struct BridgeIndexerConfig { - /// URL of the sui remote store. - #[clap(long, short = 'r', required = true)] - pub remote_store_url: Option, - /// URL for Eth fullnode. - #[clap(long, short = 'e', required = true)] +/// config as loaded from `config.yaml`. +#[derive(Debug, Deserialize)] +pub struct Config { + pub remote_store_url: String, pub eth_rpc_url: String, - /// URL of the DB instance holding indexed bridge data. - #[clap(long, short = 'd', required = true)] pub db_url: String, - /// Path to the file where the progress store is stored. - #[clap( - long, - short = 'p', - default_value = "/tmp/progress_store", - global = true - )] - pub progress_store_file: PathBuf, - /// Path to the directory where the checkpoints are stored. - #[clap(long, short = 'c', default_value = "/tmp", global = true)] - pub checkpoints_path: PathBuf, - /// Number of worker threads to use. - #[clap(long, short = 't', default_value = "1", global = true)] - pub concurrency: usize, - /// Address of the SuiBridge contract - #[clap(long, required = true, short = 'a')] + pub progress_store_file: String, + pub checkpoints_path: String, + pub concurrency: u64, pub eth_sui_bridge_contract_address: String, - /// Block to start indexing from - #[clap(long, required = true, short = 's')] pub start_block: u64, } + +/// Load the config to run. +pub fn load_config(path: &Path) -> Result { + let reader = fs::File::open(path)?; + let config: Config = serde_yaml::from_reader(reader)?; + Ok(config) +} diff --git a/crates/sui-bridge-indexer/src/config.yaml b/crates/sui-bridge-indexer/src/config.yaml new file mode 100644 index 0000000000000..f4644c7a7c71e --- /dev/null +++ b/crates/sui-bridge-indexer/src/config.yaml @@ -0,0 +1,18 @@ +# config.yaml format: + +# URL of the remote store +# remote_store_url: +# URL for Ethereum RPC +# eth_rpc_url: +# Database connection URL +# db_url: +# File to store the progress +# progress_store_file: +# Path to the checkpoints +# checkpoints_path: +# Number of concurrent operations +# concurrency: 1 +# Ethereum to Sui bridge contract address +# eth_sui_bridge_contract_address: +# Starting block number +# start_block: \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 09210840ffb56..03377628e08a1 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -2,16 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use clap::Parser; +use clap::*; use ethers::types::Address as EthAddress; use mysten_metrics::spawn_logged_monitored_task; use mysten_metrics::start_prometheus_server; use prometheus::Registry; use std::collections::HashMap; use std::collections::HashSet; +use std::env; use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::SocketAddr; +use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use sui_bridge::{ @@ -21,7 +23,7 @@ use sui_bridge::{ }; use sui_bridge_indexer::postgres_writer::get_connection_pool; use sui_bridge_indexer::{ - config::BridgeIndexerConfig, worker::process_eth_transaction, worker::BridgeWorker, + config::load_config, worker::process_eth_transaction, worker::BridgeWorker, }; use sui_data_ingestion_core::{ DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, @@ -29,10 +31,26 @@ use sui_data_ingestion_core::{ use tokio::sync::oneshot; use tracing::info; +#[derive(Parser, Clone, Debug)] +#[clap(group(ArgGroup::new("input").required(true).args(&["config_path"])))] +struct Args { + /// Path to a yaml config + #[clap(long, short, default_value = "config.yaml")] + config_path: Option, +} + #[tokio::main] async fn main() -> Result<()> { - let config = BridgeIndexerConfig::parse(); - info!("Parsed config: {:#?}", config); + let args = Args::parse(); + + // load config + let config_path = if let Some(path) = args.config_path { + path.join("config.yaml") + } else { + env::current_dir().unwrap().join("config.yaml") + }; + + let config = load_config(&config_path).unwrap(); // start metrics server let (_exit_sender, exit_receiver) = oneshot::channel(); @@ -88,18 +106,18 @@ async fn main() -> Result<()> { ); // start sui side - let progress_store = FileProgressStore::new(config.progress_store_file); + let progress_store = FileProgressStore::new(config.progress_store_file.into()); let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); let worker_pool = WorkerPool::new( BridgeWorker::new(vec![], config.db_url.clone()), "bridge worker".into(), - config.concurrency, + config.concurrency as usize, ); executor.register(worker_pool).await?; executor .run( - config.checkpoints_path, - config.remote_store_url, + config.checkpoints_path.into(), + Some(config.remote_store_url), vec![], // optional remote store access options ReaderOptions::default(), exit_receiver, From 39b578bf85ddea464213fd39f831ac808f3b6f74 Mon Sep 17 00:00:00 2001 From: Bridgerz Date: Thu, 23 May 2024 14:56:18 +0200 Subject: [PATCH 12/13] fix config --- crates/sui-bridge-indexer/{src => }/config.yaml | 0 crates/sui-bridge-indexer/src/main.rs | 3 +-- 2 files changed, 1 insertion(+), 2 deletions(-) rename crates/sui-bridge-indexer/{src => }/config.yaml (100%) diff --git a/crates/sui-bridge-indexer/src/config.yaml b/crates/sui-bridge-indexer/config.yaml similarity index 100% rename from crates/sui-bridge-indexer/src/config.yaml rename to crates/sui-bridge-indexer/config.yaml diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 03377628e08a1..81abfaba04d13 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -32,10 +32,9 @@ use tokio::sync::oneshot; use tracing::info; #[derive(Parser, Clone, Debug)] -#[clap(group(ArgGroup::new("input").required(true).args(&["config_path"])))] struct Args { /// Path to a yaml config - #[clap(long, short, default_value = "config.yaml")] + #[clap(long, short)] config_path: Option, } From f98c901e4ecb24f8213317fe0bbf26158f860e0b Mon Sep 17 00:00:00 2001 From: Dario Russi <113150618+dariorussi@users.noreply.github.com> Date: Thu, 23 May 2024 15:38:21 +0200 Subject: [PATCH 13/13] random fixes --- crates/sui-bridge-indexer/src/config.rs | 6 +++--- crates/sui-bridge-indexer/src/lib.rs | 8 ++++---- crates/sui-bridge-indexer/src/worker.rs | 14 +++++++++----- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs index cbfcf3e76a97b..1909ac71f0b48 100644 --- a/crates/sui-bridge-indexer/src/config.rs +++ b/crates/sui-bridge-indexer/src/config.rs @@ -1,9 +1,9 @@ -use anyhow::Result; -use std::{fs, path::Path}; - // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; use serde::Deserialize; +use std::{fs, path::Path}; /// config as loaded from `config.yaml`. #[derive(Debug, Deserialize)] diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs index bcd9beaedf67a..97bee58203d8c 100644 --- a/crates/sui-bridge-indexer/src/lib.rs +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -90,15 +90,15 @@ impl Display for TokenTransferStatus { } enum BridgeDataSource { - SUI, - ETH, + Sui, + Eth, } impl Display for BridgeDataSource { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let str = match self { - BridgeDataSource::ETH => "ETH", - BridgeDataSource::SUI => "SUI", + BridgeDataSource::Eth => "ETH", + BridgeDataSource::Sui => "SUI", }; write!(f, "{str}") } diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs index d84d3a473be99..b05bba87ffdda 100644 --- a/crates/sui-bridge-indexer/src/worker.rs +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -74,7 +74,7 @@ impl BridgeWorker { txn_hash: tx.transaction.digest().inner().to_vec(), status: TokenTransferStatus::Deposited, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), - data_source: BridgeDataSource::SUI, + data_source: BridgeDataSource::Sui, data: Some(TokenTransferData { sender_address: event.sender_address, destination_chain: event.target_chain, @@ -96,7 +96,7 @@ impl BridgeWorker { txn_hash: tx.transaction.digest().inner().to_vec(), status: TokenTransferStatus::Approved, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), - data_source: BridgeDataSource::SUI, + data_source: BridgeDataSource::Sui, data: None, }) } @@ -112,7 +112,7 @@ impl BridgeWorker { txn_hash: tx.transaction.digest().inner().to_vec(), status: TokenTransferStatus::Claimed, gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), - data_source: BridgeDataSource::SUI, + data_source: BridgeDataSource::Sui, data: None, }) } @@ -166,7 +166,7 @@ pub async fn process_eth_transaction( txn_hash: tx_hash.as_bytes().to_vec(), status: TokenTransferStatus::Deposited, gas_usage: gas.as_u64() as i64, - data_source: BridgeDataSource::ETH, + data_source: BridgeDataSource::Eth, data: Some(TokenTransferData { sender_address: bridge_event.sender_address.as_bytes().to_vec(), destination_chain: bridge_event.destination_chain_id, @@ -188,7 +188,7 @@ pub async fn process_eth_transaction( txn_hash: tx_hash.as_bytes().to_vec(), status: TokenTransferStatus::Claimed, gas_usage: gas.as_u64() as i64, - data_source: BridgeDataSource::ETH, + data_source: BridgeDataSource::Eth, data: None, }; @@ -199,6 +199,10 @@ pub async fn process_eth_transaction( | EthSuiBridgeEvents::UpgradedFilter(_) | EthSuiBridgeEvents::InitializedFilter(_) => (), }, + EthBridgeEvent::EthBridgeCommitteeEvents(_) + | EthBridgeEvent::EthBridgeLimiterEvents(_) + | EthBridgeEvent::EthBridgeConfigEvents(_) + | EthBridgeEvent::EthCommitteeUpgradeableContractEvents(_) => (), } } }