diff --git a/Cargo.lock b/Cargo.lock index df8d287b72a85..137bdd8153937 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12038,6 +12038,32 @@ dependencies = [ "tracing", ] +[[package]] +name = "sui-bridge-indexer" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bcs", + "bin-version", + "clap", + "diesel", + "ethers", + "hex-literal 0.3.4", + "mysten-metrics", + "prometheus", + "serde", + "serde_yaml 0.8.26", + "sui-bridge", + "sui-config", + "sui-data-ingestion-core", + "sui-test-transaction-builder", + "sui-types", + "test-cluster", + "tokio", + "tracing", +] + [[package]] name = "sui-cluster-test" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 247d3ba5b003f..1393c3514ea34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -93,6 +93,7 @@ members = [ "crates/sui-benchmark", "crates/sui-bridge", "crates/sui-bridge-cli", + "crates/sui-bridge-indexer", "crates/sui-cluster-test", "crates/sui-common", "crates/sui-config", diff --git a/crates/sui-bridge-indexer/Cargo.toml b/crates/sui-bridge-indexer/Cargo.toml new file mode 100644 index 0000000000000..66833b7044eea --- /dev/null +++ b/crates/sui-bridge-indexer/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "sui-bridge-indexer" +version = "0.1.0" +authors = ["Mysten Labs "] +license = "Apache-2.0" +publish = false +edition = "2021" + +[dependencies] +serde.workspace = true +diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] } +ethers = "2.0" +tokio = { workspace = true, features = ["full"] } +sui-types.workspace = true +prometheus.workspace = true +async-trait.workspace = true +sui-data-ingestion-core.workspace = true +sui-bridge.workspace = true +clap.workspace = true +tracing.workspace = true +bin-version.workspace = true +anyhow.workspace = true +mysten-metrics.workspace = true +bcs.workspace = true +serde_yaml.workspace = true + +[dev-dependencies] +sui-types = { workspace = true, features = ["test-utils"] } +sui-config.workspace = true +sui-test-transaction-builder.workspace = true +test-cluster.workspace = true +hex-literal = "0.3.4" + +[[bin]] +name = "bridge-indexer" +path = "src/main.rs" diff --git a/crates/sui-bridge-indexer/config.yaml b/crates/sui-bridge-indexer/config.yaml new file mode 100644 index 0000000000000..f4644c7a7c71e --- /dev/null +++ b/crates/sui-bridge-indexer/config.yaml @@ -0,0 +1,18 @@ +# config.yaml format: + +# URL of the remote store +# remote_store_url: +# URL for Ethereum RPC +# eth_rpc_url: +# Database connection URL +# db_url: +# File to store the progress +# progress_store_file: +# Path to the checkpoints +# checkpoints_path: +# Number of concurrent operations +# concurrency: 1 +# Ethereum to Sui bridge contract address +# eth_sui_bridge_contract_address: +# Starting block number +# start_block: \ No newline at end of file diff --git a/crates/sui-bridge-indexer/diesel.toml b/crates/sui-bridge-indexer/diesel.toml new file mode 100644 index 0000000000000..061dc413ebe1f --- /dev/null +++ b/crates/sui-bridge-indexer/diesel.toml @@ -0,0 +1,9 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/schema.rs" +#custom_type_derives = ["diesel::query_builder::QueryId"] + +[migrations_directory] +dir = "src/migrations" diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs new file mode 100644 index 0000000000000..1909ac71f0b48 --- /dev/null +++ b/crates/sui-bridge-indexer/src/config.rs @@ -0,0 +1,26 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use serde::Deserialize; +use std::{fs, path::Path}; + +/// config as loaded from `config.yaml`. +#[derive(Debug, Deserialize)] +pub struct Config { + pub remote_store_url: String, + pub eth_rpc_url: String, + pub db_url: String, + pub progress_store_file: String, + pub checkpoints_path: String, + pub concurrency: u64, + pub eth_sui_bridge_contract_address: String, + pub start_block: u64, +} + +/// Load the config to run. +pub fn load_config(path: &Path) -> Result { + let reader = fs::File::open(path)?; + let config: Config = serde_yaml::from_reader(reader)?; + Ok(config) +} diff --git a/crates/sui-bridge-indexer/src/lib.rs b/crates/sui-bridge-indexer/src/lib.rs new file mode 100644 index 0000000000000..97bee58203d8c --- /dev/null +++ b/crates/sui-bridge-indexer/src/lib.rs @@ -0,0 +1,105 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::models::TokenTransfer as DBTokenTransfer; +use crate::models::TokenTransferData as DBTokenTransferData; +use anyhow::anyhow; +use std::fmt::{Display, Formatter}; + +pub mod config; +pub mod models; +pub mod postgres_writer; +pub mod schema; +pub mod worker; + +pub struct TokenTransfer { + chain_id: u8, + nonce: u64, + block_height: u64, + timestamp_ms: u64, + txn_hash: Vec, + status: TokenTransferStatus, + gas_usage: i64, + data_source: BridgeDataSource, + data: Option, +} + +pub struct TokenTransferData { + sender_address: Vec, + destination_chain: u8, + recipient_address: Vec, + token_id: u8, + amount: u64, +} + +impl From for DBTokenTransfer { + fn from(value: TokenTransfer) -> Self { + DBTokenTransfer { + chain_id: value.chain_id as i32, + nonce: value.nonce as i64, + block_height: value.block_height as i64, + timestamp_ms: value.timestamp_ms as i64, + txn_hash: value.txn_hash, + status: value.status.to_string(), + gas_usage: value.gas_usage, + data_source: value.data_source.to_string(), + } + } +} + +impl TryFrom<&TokenTransfer> for DBTokenTransferData { + type Error = anyhow::Error; + + fn try_from(value: &TokenTransfer) -> Result { + value + .data + .as_ref() + .ok_or(anyhow!( + "Data is empty for TokenTransfer: chain_id = {}, nonce = {}, status = {}", + value.chain_id, + value.nonce, + value.status + )) + .map(|data| DBTokenTransferData { + chain_id: value.chain_id as i32, + nonce: value.nonce as i64, + sender_address: data.sender_address.clone(), + destination_chain: data.destination_chain as i32, + recipient_address: data.recipient_address.clone(), + token_id: data.token_id as i32, + amount: data.amount as i64, + }) + } +} + +pub(crate) enum TokenTransferStatus { + Deposited, + Approved, + Claimed, +} + +impl Display for TokenTransferStatus { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str = match self { + TokenTransferStatus::Deposited => "Deposited", + TokenTransferStatus::Approved => "Approved", + TokenTransferStatus::Claimed => "Claimed", + }; + write!(f, "{str}") + } +} + +enum BridgeDataSource { + Sui, + Eth, +} + +impl Display for BridgeDataSource { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let str = match self { + BridgeDataSource::Eth => "ETH", + BridgeDataSource::Sui => "SUI", + }; + write!(f, "{str}") + } +} diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs new file mode 100644 index 0000000000000..81abfaba04d13 --- /dev/null +++ b/crates/sui-bridge-indexer/src/main.rs @@ -0,0 +1,127 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use clap::*; +use ethers::types::Address as EthAddress; +use mysten_metrics::spawn_logged_monitored_task; +use mysten_metrics::start_prometheus_server; +use prometheus::Registry; +use std::collections::HashMap; +use std::collections::HashSet; +use std::env; +use std::net::IpAddr; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use sui_bridge::{ + abi::{EthBridgeCommittee, EthSuiBridge}, + eth_client::EthClient, + eth_syncer::EthSyncer, +}; +use sui_bridge_indexer::postgres_writer::get_connection_pool; +use sui_bridge_indexer::{ + config::load_config, worker::process_eth_transaction, worker::BridgeWorker, +}; +use sui_data_ingestion_core::{ + DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, +}; +use tokio::sync::oneshot; +use tracing::info; + +#[derive(Parser, Clone, Debug)] +struct Args { + /// Path to a yaml config + #[clap(long, short)] + config_path: Option, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + // load config + let config_path = if let Some(path) = args.config_path { + path.join("config.yaml") + } else { + env::current_dir().unwrap().join("config.yaml") + }; + + let config = load_config(&config_path).unwrap(); + + // start metrics server + let (_exit_sender, exit_receiver) = oneshot::channel(); + let metrics = DataIngestionMetrics::new(&Registry::new()); + + // Init metrics server + let metrics_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1000); + let registry_service = start_prometheus_server(metrics_address); + let prometheus_registry = registry_service.default_registry(); + mysten_metrics::init_metrics(&prometheus_registry); + info!("Metrics server started at port {}", 1000); + + // start eth client + let provider = Arc::new( + ethers::prelude::Provider::::try_from(&config.eth_rpc_url) + .unwrap() + .interval(std::time::Duration::from_millis(2000)), + ); + let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?; + let sui_bridge = EthSuiBridge::new(bridge_address, provider.clone()); + let committee_address: EthAddress = sui_bridge.committee().call().await?; + let limiter_address: EthAddress = sui_bridge.limiter().call().await?; + let vault_address: EthAddress = sui_bridge.vault().call().await?; + let committee = EthBridgeCommittee::new(committee_address, provider.clone()); + let config_address: EthAddress = committee.config().call().await?; + + let eth_client = Arc::new( + EthClient::::new( + &config.eth_rpc_url, + HashSet::from_iter(vec![ + bridge_address, + committee_address, + config_address, + limiter_address, + vault_address, + ]), + ) + .await?, + ); + + let contract_addresses = HashMap::from_iter(vec![(bridge_address, config.start_block)]); + + let (_task_handles, eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses) + .run() + .await + .expect("Failed to start eth syncer"); + + let pg_pool = get_connection_pool(config.db_url.clone()); + + let _task_handle = spawn_logged_monitored_task!( + process_eth_transaction(eth_events_rx, provider.clone(), pg_pool), + "indexer handler" + ); + + // start sui side + let progress_store = FileProgressStore::new(config.progress_store_file.into()); + let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); + let worker_pool = WorkerPool::new( + BridgeWorker::new(vec![], config.db_url.clone()), + "bridge worker".into(), + config.concurrency as usize, + ); + executor.register(worker_pool).await?; + executor + .run( + config.checkpoints_path.into(), + Some(config.remote_store_url), + vec![], // optional remote store access options + ReaderOptions::default(), + exit_receiver, + ) + .await?; + + Ok(()) +} diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql new file mode 100644 index 0000000000000..2550cdf407015 --- /dev/null +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql @@ -0,0 +1,3 @@ +-- This file should undo anything in `up.sql` +DROP TABLE IF EXISTS token_transfer; +DROP TABLE IF EXISTS token_transfer_data; diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql new file mode 100644 index 0000000000000..fffdaa5e6234b --- /dev/null +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -0,0 +1,26 @@ +CREATE TABLE token_transfer_data +( + chain_id INT NOT NULL, + nonce BIGINT NOT NULL, + sender_address bytea NOT NULL, + destination_chain INT NOT NULL, + recipient_address bytea NOT NULL, + token_id INT NOT NULL, + amount BIGINT NOT NULL, + PRIMARY KEY(chain_id, nonce) +); +CREATE INDEX token_transfer_data_destination_chain ON token_transfer_data (destination_chain); +CREATE INDEX token_transfer_data_token_id ON token_transfer_data (token_id); + +CREATE TABLE token_transfer +( + chain_id INT NOT NULL, + nonce BIGINT NOT NULL, + status TEXT NOT NULL, + block_height BIGINT NOT NULL, + timestamp_ms BIGINT NOT NULL, + txn_hash bytea NOT NULL, + gas_usage BIGINT NOT NULL, + data_source TEXT NOT NULL, + PRIMARY KEY(chain_id, nonce, status) +); \ No newline at end of file diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs new file mode 100644 index 0000000000000..ee0bcc88ab646 --- /dev/null +++ b/crates/sui-bridge-indexer/src/models.rs @@ -0,0 +1,30 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::schema::{token_transfer, token_transfer_data}; +use diesel::{Identifiable, Insertable, Queryable, Selectable}; + +#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] +#[diesel(table_name = token_transfer, primary_key(chain_id, nonce))] +pub struct TokenTransfer { + pub chain_id: i32, + pub nonce: i64, + pub status: String, + pub block_height: i64, + pub timestamp_ms: i64, + pub txn_hash: Vec, + pub gas_usage: i64, + pub data_source: String, +} + +#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] +#[diesel(table_name = token_transfer_data, primary_key(chain_id, nonce))] +pub struct TokenTransferData { + pub chain_id: i32, + pub nonce: i64, + pub sender_address: Vec, + pub destination_chain: i32, + pub recipient_address: Vec, + pub token_id: i32, + pub amount: i64, +} diff --git a/crates/sui-bridge-indexer/src/postgres_writer.rs b/crates/sui-bridge-indexer/src/postgres_writer.rs new file mode 100644 index 0000000000000..22e8c033ec295 --- /dev/null +++ b/crates/sui-bridge-indexer/src/postgres_writer.rs @@ -0,0 +1,40 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::models::TokenTransfer as DBTokenTransfer; +use crate::models::TokenTransferData as DBTokenTransferData; +use crate::schema::token_transfer_data; +use crate::{schema::token_transfer, TokenTransfer}; +use diesel::{ + pg::PgConnection, + r2d2::{ConnectionManager, Pool}, + Connection, RunQueryDsl, +}; + +pub(crate) type PgPool = Pool>; + +pub fn get_connection_pool(database_url: String) -> PgPool { + let manager = ConnectionManager::::new(database_url); + Pool::builder() + .test_on_check_out(true) + .build(manager) + .expect("Could not build Postgres DB connection pool") +} + +pub fn write(pool: &PgPool, token: TokenTransfer) { + let connection = &mut pool.get().unwrap(); + connection + .transaction(|conn| { + if let Ok(data) = DBTokenTransferData::try_from(&token) { + diesel::insert_into(token_transfer_data::table) + .values(data) + .on_conflict_do_nothing() + .execute(conn)?; + }; + diesel::insert_into(token_transfer::table) + .values(DBTokenTransfer::from(token)) + .on_conflict_do_nothing() + .execute(conn) + }) + .expect("Failed to start connection to DB"); +} diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs new file mode 100644 index 0000000000000..e13e095c22654 --- /dev/null +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -0,0 +1,31 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// @generated automatically by Diesel CLI. + +diesel::table! { + token_transfer (chain_id, nonce, status) { + chain_id -> Int4, + nonce -> Int8, + status -> Text, + block_height -> Int8, + timestamp_ms -> Int8, + txn_hash -> Bytea, + gas_usage -> Int8, + data_source -> Text, + } +} + +diesel::table! { + token_transfer_data (chain_id, nonce) { + chain_id -> Int4, + nonce -> Int8, + sender_address -> Bytea, + destination_chain -> Int4, + recipient_address -> Bytea, + token_id -> Int4, + amount -> Int8, + } +} + +diesel::allow_tables_to_appear_in_same_query!(token_transfer, token_transfer_data,); diff --git a/crates/sui-bridge-indexer/src/worker.rs b/crates/sui-bridge-indexer/src/worker.rs new file mode 100644 index 0000000000000..b05bba87ffdda --- /dev/null +++ b/crates/sui-bridge-indexer/src/worker.rs @@ -0,0 +1,229 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::postgres_writer::{get_connection_pool, write, PgPool}; +use crate::{BridgeDataSource, TokenTransfer, TokenTransferData, TokenTransferStatus}; +use anyhow::Result; +use async_trait::async_trait; +use ethers::providers::Provider; +use ethers::providers::{Http, Middleware}; +use ethers::types::Address as EthAddress; +use std::collections::BTreeSet; +use std::sync::Arc; +use sui_bridge::abi::{EthBridgeEvent, EthSuiBridgeEvents}; +use sui_bridge::events::{ + MoveTokenDepositedEvent, MoveTokenTransferApproved, MoveTokenTransferClaimed, +}; +use sui_bridge::types::EthLog; +use sui_data_ingestion_core::Worker; +use sui_types::effects::TransactionEffectsAPI; +use sui_types::{ + base_types::ObjectID, + full_checkpoint_content::{CheckpointData, CheckpointTransaction}, + transaction::{TransactionDataAPI, TransactionKind}, + BRIDGE_ADDRESS, SUI_BRIDGE_OBJECT_ID, +}; +use tracing::info; + +pub struct BridgeWorker { + bridge_object_ids: BTreeSet, + pg_pool: PgPool, +} + +impl BridgeWorker { + pub fn new(bridge_object_ids: Vec, db_url: String) -> Self { + let mut bridge_object_ids = bridge_object_ids.into_iter().collect::>(); + bridge_object_ids.insert(SUI_BRIDGE_OBJECT_ID); + let pg_pool = get_connection_pool(db_url); + Self { + bridge_object_ids, + pg_pool, + } + } + + // Return true if the transaction relates to the bridge and is of interest. + fn is_bridge_transaction(&self, tx: &CheckpointTransaction) -> bool { + // TODO: right now this returns true for programmable transactions that + // have the bridge object as input. We can extend later to cover other cases + let txn_data = tx.transaction.transaction_data(); + if let TransactionKind::ProgrammableTransaction(_pt) = txn_data.kind() { + return tx + .input_objects + .iter() + .any(|obj| self.bridge_object_ids.contains(&obj.id())); + }; + false + } + + // Process a transaction that has been identified as a bridge transaction. + fn process_transaction(&self, tx: &CheckpointTransaction, checkpoint: u64, timestamp_ms: u64) { + if let Some(event) = &tx.events { + event.data.iter().for_each(|ev| { + if ev.type_.address == BRIDGE_ADDRESS { + let token_transfer = match ev.type_.name.as_str() { + "TokenDepositedEvent" => { + println!("Observed Sui Deposit"); + // todo: handle deserialization error + let event: MoveTokenDepositedEvent = + bcs::from_bytes(&ev.contents).unwrap(); + Some(TokenTransfer { + chain_id: event.source_chain, + nonce: event.seq_num, + block_height: checkpoint, + timestamp_ms, + txn_hash: tx.transaction.digest().inner().to_vec(), + status: TokenTransferStatus::Deposited, + gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::Sui, + data: Some(TokenTransferData { + sender_address: event.sender_address, + destination_chain: event.target_chain, + recipient_address: event.target_address, + token_id: event.token_type, + amount: event.amount_sui_adjusted, + }), + }) + } + "TokenTransferApproved" => { + println!("Observed Sui Approval"); + let event: MoveTokenTransferApproved = + bcs::from_bytes(&ev.contents).unwrap(); + Some(TokenTransfer { + chain_id: event.message_key.source_chain, + nonce: event.message_key.bridge_seq_num, + block_height: checkpoint, + timestamp_ms, + txn_hash: tx.transaction.digest().inner().to_vec(), + status: TokenTransferStatus::Approved, + gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::Sui, + data: None, + }) + } + "TokenTransferClaimed" => { + println!("Observed Sui Claim"); + let event: MoveTokenTransferClaimed = + bcs::from_bytes(&ev.contents).unwrap(); + Some(TokenTransfer { + chain_id: event.message_key.source_chain, + nonce: event.message_key.bridge_seq_num, + block_height: checkpoint, + timestamp_ms, + txn_hash: tx.transaction.digest().inner().to_vec(), + status: TokenTransferStatus::Claimed, + gas_usage: tx.effects.gas_cost_summary().net_gas_usage(), + data_source: BridgeDataSource::Sui, + data: None, + }) + } + _ => None, + }; + + if let Some(transfer) = token_transfer { + println!("SUI: Storing bridge event : {:?}", ev.type_); + write(&self.pg_pool, transfer); + } + }; + }); + } + } +} + +pub async fn process_eth_transaction( + mut eth_events_rx: mysten_metrics::metered_channel::Receiver<(EthAddress, u64, Vec)>, + provider: Arc>, + pool: PgPool, +) { + while let Some(event) = eth_events_rx.recv().await { + for log in event.2.iter() { + let eth_bridge_event = EthBridgeEvent::try_from_eth_log(log); + if eth_bridge_event.is_none() { + continue; + } + let bridge_event = eth_bridge_event.unwrap(); + let block_number = log.block_number; + let block = provider.get_block(log.block_number).await.unwrap().unwrap(); + let timestamp = block.timestamp.as_u64() * 1000; + let transaction = provider + .get_transaction(log.tx_hash) + .await + .unwrap() + .unwrap(); + let gas = transaction.gas; + let tx_hash = log.tx_hash; + + println!("Observed Eth bridge event: {:#?}", bridge_event); + + match bridge_event { + EthBridgeEvent::EthSuiBridgeEvents(bridge_event) => match bridge_event { + EthSuiBridgeEvents::TokensDepositedFilter(bridge_event) => { + println!("Observed Eth Deposit"); + let transfer = TokenTransfer { + chain_id: bridge_event.source_chain_id, + nonce: bridge_event.nonce, + block_height: block_number, + timestamp_ms: timestamp, + txn_hash: tx_hash.as_bytes().to_vec(), + status: TokenTransferStatus::Deposited, + gas_usage: gas.as_u64() as i64, + data_source: BridgeDataSource::Eth, + data: Some(TokenTransferData { + sender_address: bridge_event.sender_address.as_bytes().to_vec(), + destination_chain: bridge_event.destination_chain_id, + recipient_address: bridge_event.recipient_address.to_vec(), + token_id: bridge_event.token_id, + amount: bridge_event.sui_adjusted_amount, + }), + }; + + write(&pool, transfer); + } + EthSuiBridgeEvents::TokensClaimedFilter(bridge_event) => { + println!("Observed Eth Claim"); + let transfer = TokenTransfer { + chain_id: bridge_event.source_chain_id, + nonce: bridge_event.nonce, + block_height: block_number, + timestamp_ms: timestamp, + txn_hash: tx_hash.as_bytes().to_vec(), + status: TokenTransferStatus::Claimed, + gas_usage: gas.as_u64() as i64, + data_source: BridgeDataSource::Eth, + data: None, + }; + + write(&pool, transfer); + } + EthSuiBridgeEvents::PausedFilter(_) + | EthSuiBridgeEvents::UnpausedFilter(_) + | EthSuiBridgeEvents::UpgradedFilter(_) + | EthSuiBridgeEvents::InitializedFilter(_) => (), + }, + EthBridgeEvent::EthBridgeCommitteeEvents(_) + | EthBridgeEvent::EthBridgeLimiterEvents(_) + | EthBridgeEvent::EthBridgeConfigEvents(_) + | EthBridgeEvent::EthCommitteeUpgradeableContractEvents(_) => (), + } + } + } +} + +#[async_trait] +impl Worker for BridgeWorker { + async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> { + info!( + "Processing checkpoint [{}] {}: {}", + checkpoint.checkpoint_summary.epoch, + checkpoint.checkpoint_summary.sequence_number, + checkpoint.transactions.len(), + ); + let checkpoint_num = checkpoint.checkpoint_summary.sequence_number; + let timestamp_ms = checkpoint.checkpoint_summary.timestamp_ms; + checkpoint + .transactions + .iter() + .filter(|txn| self.is_bridge_transaction(txn)) + .for_each(|txn| self.process_transaction(txn, checkpoint_num, timestamp_ms)); + Ok(()) + } +}