Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bridge indexer #17870

Merged
merged 13 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ members = [
"crates/sui-benchmark",
"crates/sui-bridge",
"crates/sui-bridge-cli",
"crates/sui-bridge-indexer",
"crates/sui-cluster-test",
"crates/sui-common",
"crates/sui-config",
Expand Down
36 changes: 36 additions & 0 deletions crates/sui-bridge-indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
[package]
name = "sui-bridge-indexer"
version = "0.1.0"
authors = ["Mysten Labs <[email protected]>"]
license = "Apache-2.0"
publish = false
edition = "2021"

[dependencies]
serde.workspace = true
diesel = { version = "2.1.4", features = ["postgres", "r2d2", "serde_json"] }
ethers = "2.0"
tokio = { workspace = true, features = ["full"] }
sui-types.workspace = true
prometheus.workspace = true
async-trait.workspace = true
sui-data-ingestion-core.workspace = true
sui-bridge.workspace = true
clap.workspace = true
tracing.workspace = true
bin-version.workspace = true
anyhow.workspace = true
mysten-metrics.workspace = true
bcs.workspace = true
serde_yaml.workspace = true

[dev-dependencies]
sui-types = { workspace = true, features = ["test-utils"] }
sui-config.workspace = true
sui-test-transaction-builder.workspace = true
test-cluster.workspace = true
hex-literal = "0.3.4"

[[bin]]
name = "bridge-indexer"
path = "src/main.rs"
18 changes: 18 additions & 0 deletions crates/sui-bridge-indexer/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# config.yaml format:

# URL of the remote store
# remote_store_url: <url>
# URL for Ethereum RPC
# eth_rpc_url: <url>
# Database connection URL
# db_url: <url>
# File to store the progress
# progress_store_file: <path>
# Path to the checkpoints
# checkpoints_path: <path>
# Number of concurrent operations
# concurrency: 1
# Ethereum to Sui bridge contract address
# eth_sui_bridge_contract_address: <contract_address>
# Starting block number
# start_block: <indexing_start_block>
9 changes: 9 additions & 0 deletions crates/sui-bridge-indexer/diesel.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# For documentation on how to configure this file,
# see https://diesel.rs/guides/configuring-diesel-cli

[print_schema]
file = "src/schema.rs"
#custom_type_derives = ["diesel::query_builder::QueryId"]

[migrations_directory]
dir = "src/migrations"
26 changes: 26 additions & 0 deletions crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use serde::Deserialize;
use std::{fs, path::Path};

/// config as loaded from `config.yaml`.
#[derive(Debug, Deserialize)]
pub struct Config {
pub remote_store_url: String,
pub eth_rpc_url: String,
pub db_url: String,
pub progress_store_file: String,
pub checkpoints_path: String,
pub concurrency: u64,
pub eth_sui_bridge_contract_address: String,
pub start_block: u64,
}

/// Load the config to run.
pub fn load_config(path: &Path) -> Result<Config> {
let reader = fs::File::open(path)?;
let config: Config = serde_yaml::from_reader(reader)?;
Ok(config)
}
105 changes: 105 additions & 0 deletions crates/sui-bridge-indexer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::models::TokenTransfer as DBTokenTransfer;
use crate::models::TokenTransferData as DBTokenTransferData;
use anyhow::anyhow;
use std::fmt::{Display, Formatter};

pub mod config;
pub mod models;
pub mod postgres_writer;
pub mod schema;
pub mod worker;

pub struct TokenTransfer {
chain_id: u8,
nonce: u64,
block_height: u64,
timestamp_ms: u64,
txn_hash: Vec<u8>,
status: TokenTransferStatus,
gas_usage: i64,
data_source: BridgeDataSource,
data: Option<TokenTransferData>,
}

pub struct TokenTransferData {
sender_address: Vec<u8>,
destination_chain: u8,
recipient_address: Vec<u8>,
token_id: u8,
amount: u64,
}

impl From<TokenTransfer> for DBTokenTransfer {
fn from(value: TokenTransfer) -> Self {
DBTokenTransfer {
chain_id: value.chain_id as i32,
nonce: value.nonce as i64,
block_height: value.block_height as i64,
timestamp_ms: value.timestamp_ms as i64,
txn_hash: value.txn_hash,
status: value.status.to_string(),
gas_usage: value.gas_usage,
data_source: value.data_source.to_string(),
}
}
}

impl TryFrom<&TokenTransfer> for DBTokenTransferData {
type Error = anyhow::Error;

fn try_from(value: &TokenTransfer) -> Result<Self, Self::Error> {
value
.data
.as_ref()
.ok_or(anyhow!(
"Data is empty for TokenTransfer: chain_id = {}, nonce = {}, status = {}",
value.chain_id,
value.nonce,
value.status
))
.map(|data| DBTokenTransferData {
chain_id: value.chain_id as i32,
nonce: value.nonce as i64,
sender_address: data.sender_address.clone(),
destination_chain: data.destination_chain as i32,
recipient_address: data.recipient_address.clone(),
token_id: data.token_id as i32,
amount: data.amount as i64,
})
}
}

pub(crate) enum TokenTransferStatus {
Deposited,
Approved,
Claimed,
}

impl Display for TokenTransferStatus {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let str = match self {
TokenTransferStatus::Deposited => "Deposited",
TokenTransferStatus::Approved => "Approved",
TokenTransferStatus::Claimed => "Claimed",
};
write!(f, "{str}")
}
}

enum BridgeDataSource {
Sui,
Eth,
}

impl Display for BridgeDataSource {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let str = match self {
BridgeDataSource::Eth => "ETH",
BridgeDataSource::Sui => "SUI",
};
write!(f, "{str}")
}
}
127 changes: 127 additions & 0 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use clap::*;
use ethers::types::Address as EthAddress;
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::start_prometheus_server;
use prometheus::Registry;
use std::collections::HashMap;
use std::collections::HashSet;
use std::env;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::{
abi::{EthBridgeCommittee, EthSuiBridge},
eth_client::EthClient,
eth_syncer::EthSyncer,
};
use sui_bridge_indexer::postgres_writer::get_connection_pool;
use sui_bridge_indexer::{
config::load_config, worker::process_eth_transaction, worker::BridgeWorker,
};
use sui_data_ingestion_core::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool,
};
use tokio::sync::oneshot;
use tracing::info;

#[derive(Parser, Clone, Debug)]
struct Args {
/// Path to a yaml config
#[clap(long, short)]
config_path: Option<PathBuf>,
}

#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();

// load config
let config_path = if let Some(path) = args.config_path {
path.join("config.yaml")
} else {
env::current_dir().unwrap().join("config.yaml")
};

let config = load_config(&config_path).unwrap();

// start metrics server
let (_exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());

// Init metrics server
let metrics_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1000);
let registry_service = start_prometheus_server(metrics_address);
let prometheus_registry = registry_service.default_registry();
mysten_metrics::init_metrics(&prometheus_registry);
info!("Metrics server started at port {}", 1000);

// start eth client
let provider = Arc::new(
ethers::prelude::Provider::<ethers::providers::Http>::try_from(&config.eth_rpc_url)
.unwrap()
.interval(std::time::Duration::from_millis(2000)),
);
let bridge_address = EthAddress::from_str(&config.eth_sui_bridge_contract_address)?;
let sui_bridge = EthSuiBridge::new(bridge_address, provider.clone());
let committee_address: EthAddress = sui_bridge.committee().call().await?;
let limiter_address: EthAddress = sui_bridge.limiter().call().await?;
let vault_address: EthAddress = sui_bridge.vault().call().await?;
let committee = EthBridgeCommittee::new(committee_address, provider.clone());
let config_address: EthAddress = committee.config().call().await?;

let eth_client = Arc::new(
EthClient::<ethers::providers::Http>::new(
&config.eth_rpc_url,
HashSet::from_iter(vec![
bridge_address,
committee_address,
config_address,
limiter_address,
vault_address,
]),
)
.await?,
);

let contract_addresses = HashMap::from_iter(vec![(bridge_address, config.start_block)]);

let (_task_handles, eth_events_rx, _) = EthSyncer::new(eth_client, contract_addresses)
.run()
.await
.expect("Failed to start eth syncer");

let pg_pool = get_connection_pool(config.db_url.clone());

let _task_handle = spawn_logged_monitored_task!(
process_eth_transaction(eth_events_rx, provider.clone(), pg_pool),
"indexer handler"
);

// start sui side
let progress_store = FileProgressStore::new(config.progress_store_file.into());
let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics);
let worker_pool = WorkerPool::new(
BridgeWorker::new(vec![], config.db_url.clone()),
"bridge worker".into(),
config.concurrency as usize,
);
executor.register(worker_pool).await?;
executor
.run(
config.checkpoints_path.into(),
Some(config.remote_store_url),
vec![], // optional remote store access options
ReaderOptions::default(),
exit_receiver,
)
.await?;

Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS token_transfer;
DROP TABLE IF EXISTS token_transfer_data;
Loading
Loading