Skip to content

Commit

Permalink
Bridge indexer (#17870)
Browse files Browse the repository at this point in the history
## Description 

Adds the sui-bridge-indexer crate.

---------

Co-authored-by: patrick <[email protected]>
Co-authored-by: Dario Russi <[email protected]>
  • Loading branch information
3 people committed May 24, 2024
1 parent 856d59f commit 5137c6e
Show file tree
Hide file tree
Showing 14 changed files with 707 additions and 0 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ members = [
"crates/sui-benchmark",
"crates/sui-bridge",
"crates/sui-bridge-cli",
"crates/sui-bridge-indexer",
"crates/sui-cluster-test",
"crates/sui-common",
"crates/sui-config",
Expand Down
36 changes: 36 additions & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "sui-bridge-indexer"
version = "0.1.0"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
serde.workspace = true
diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] }
ethers = "2.0"
tokio = { workspace = true, features = ["full"] }
sui-types.workspace = true
prometheus.workspace = true
async-trait.workspace = true
sui-data-ingestion-core.workspace = true
sui-bridge.workspace = true
clap.workspace = true
tracing.workspace = true
bin-version.workspace = true
anyhow.workspace = true
mysten-metrics.workspace = true
bcs.workspace = true
serde_yaml.workspace = true

[dev-dependencies]
sui-types = { workspace = true, features = ["test-utils"] }
sui-config.workspace = true
sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
hex-literal = "0.3.4"

[[bin]]
name = "bridge-indexer"
path = "src/main.rs"
18 changes: 18 additions & 0 deletions crates/sui-bridge-indexer/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# config.yaml format:

# URL of the remote store
# remote_store_url: <url>
# URL for Ethereum RPC
# eth_rpc_url: <url>
# Database connection URL
# db_url: <url>
# File to store the progress
# progress_store_file: <path>
# Path to the checkpoints
# checkpoints_path: <path>
# Number of concurrent operations
# concurrency: 1
# Ethereum to Sui bridge contract address
# eth_sui_bridge_contract_address: <contract_address>
# Starting block number
# start_block: <indexing_start_block>
9 changes: 9 additions & 0 deletions crates/sui-bridge-indexer/diesel.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# For documentation on how to configure this file,
# see https://diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "src/schema.rs"
#custom_type_derives = ["diesel::query_builder::QueryId"]

[migrations_directory]
dir = "src/migrations"
26 changes: 26 additions & 0 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use serde::Deserialize;
use std::{fs, path::Path};

/// config as loaded from `config.yaml`.
#[derive(Debug, Deserialize)]
pub struct Config {
pub remote_store_url: String,
pub eth_rpc_url: String,
pub db_url: String,
pub progress_store_file: String,
pub checkpoints_path: String,
pub concurrency: u64,
pub eth_sui_bridge_contract_address: String,
pub start_block: u64,
}

/// Load the config to run.
pub fn load_config(path: &Path) -> Result<Config> {
let reader = fs::File::open(path)?;
let config: Config = serde_yaml::from_reader(reader)?;
Ok(config)
}
105 changes: 105 additions & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::models::TokenTransfer as DBTokenTransfer;
use crate::models::TokenTransferData as DBTokenTransferData;
use anyhow::anyhow;
use std::fmt::{Display, Formatter};

pub mod config;
pub mod models;
pub mod postgres_writer;
pub mod schema;
pub mod worker;

pub struct TokenTransfer {
chain_id: u8,
nonce: u64,
block_height: u64,
timestamp_ms: u64,
txn_hash: Vec<u8>,
status: TokenTransferStatus,
gas_usage: i64,
data_source: BridgeDataSource,
data: Option<TokenTransferData>,
}

pub struct TokenTransferData {
sender_address: Vec<u8>,
destination_chain: u8,
recipient_address: Vec<u8>,
token_id: u8,
amount: u64,
}

impl From<TokenTransfer> for DBTokenTransfer {
fn from(value: TokenTransfer) -> Self {
DBTokenTransfer {
chain_id: value.chain_id as i32,
nonce: value.nonce as i64,
block_height: value.block_height as i64,
timestamp_ms: value.timestamp_ms as i64,
txn_hash: value.txn_hash,
status: value.status.to_string(),
gas_usage: value.gas_usage,
data_source: value.data_source.to_string(),
}
}
}

impl TryFrom<&TokenTransfer> for DBTokenTransferData {
type Error = anyhow::Error;

fn try_from(value: &TokenTransfer) -> Result<Self, Self::Error> {
value
.data
.as_ref()
.ok_or(anyhow!(
"Data is empty for TokenTransfer: chain_id = {}, nonce = {}, status = {}",
value.chain_id,
value.nonce,
value.status
))
.map(|data| DBTokenTransferData {
chain_id: value.chain_id as i32,
nonce: value.nonce as i64,
sender_address: data.sender_address.clone(),
destination_chain: data.destination_chain as i32,
recipient_address: data.recipient_address.clone(),
token_id: data.token_id as i32,
amount: data.amount as i64,
})
}
}

pub(crate) enum TokenTransferStatus {
Deposited,
Approved,
Claimed,
}

impl Display for TokenTransferStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let str = match self {
TokenTransferStatus::Deposited => "Deposited",
TokenTransferStatus::Approved => "Approved",
TokenTransferStatus::Claimed => "Claimed",
};
write!(f, "{str}")
}
}

enum BridgeDataSource {
Sui,
Eth,
}

impl Display for BridgeDataSource {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let str = match self {
BridgeDataSource::Eth => "ETH",
BridgeDataSource::Sui => "SUI",
};
write!(f, "{str}")
}
}
127 changes: 127 additions & 0 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use clap::*;
use ethers::types::Address as EthAddress;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::{
abi::{EthBridgeCommittee, EthSuiBridge},
eth_client::EthClient,
eth_syncer::EthSyncer,
};
use sui_bridge_indexer::postgres_writer::get_connection_pool;
use sui_bridge_indexer::{
config::load_config, worker::process_eth_transaction, worker::BridgeWorker,
};
use sui_data_ingestion_core::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool,
};
use tokio::sync::oneshot;
use tracing::info;

#[derive(Parser, Clone, Debug)]
struct Args {
/// Path to a yaml config
#[clap(long, short)]
config_path: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

// load config
let config_path = if let Some(path) = args.config_path {
path.join("config.yaml")
} else {
env::current_dir().unwrap().join("config.yaml")
};

let config = load_config(&config_path).unwrap();

// start metrics server
let (_exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());

// Init metrics server
let metrics_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1000);
let registry_service = start_prometheus_server(metrics_address);
let prometheus_registry = registry_service.default_registry();
mysten_metrics::init_metrics(&prometheus_registry);
info!("Metrics server started at port {}", 1000);

// start eth client
let provider = Arc::new(
ethers::prelude::Provider::<ethers::providers::Http>::try_from(&config.eth_rpc_url)
.unwrap()
.interval(std::time::Duration::from_millis(2000)),
);
let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
let sui_bridge = EthSuiBridge::new(bridge_address, provider.clone());
let committee_address: EthAddress = sui_bridge.committee().call().await?;
let limiter_address: EthAddress = sui_bridge.limiter().call().await?;
let vault_address: EthAddress = sui_bridge.vault().call().await?;
let committee = EthBridgeCommittee::new(committee_address, provider.clone());
let config_address: EthAddress = committee.config().call().await?;

let eth_client = Arc::new(
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![
bridge_address,
committee_address,
config_address,
limiter_address,
vault_address,
]),
)
.await?,
);

let contract_addresses = HashMap::from_iter(vec![(bridge_address, config.start_block)]);

let (_task_handles, eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses)
.run()
.await
.expect("Failed to start eth syncer");

let pg_pool = get_connection_pool(config.db_url.clone());

let _task_handle = spawn_logged_monitored_task!(
process_eth_transaction(eth_events_rx, provider.clone(), pg_pool),
"indexer handler"
);

// start sui side
let progress_store = FileProgressStore::new(config.progress_store_file.into());
let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics);
let worker_pool = WorkerPool::new(
BridgeWorker::new(vec![], config.db_url.clone()),
"bridge worker".into(),
config.concurrency as usize,
);
executor.register(worker_pool).await?;
executor
.run(
config.checkpoints_path.into(),
Some(config.remote_store_url),
vec![], // optional remote store access options
ReaderOptions::default(),
exit_receiver,
)
.await?;

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS token_transfer;
DROP TABLE IF EXISTS token_transfer_data;
Loading

0 comments on commit 5137c6e

Please sign in to comment.