Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions crates/sui-bridge-indexer/src/eth_bridge_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use sui_bridge::error::BridgeError;
use sui_bridge::eth_client::EthClient;
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
use sui_bridge::retry_with_max_elapsed_time;
use sui_indexer_builder::Task;
use tokio::task::JoinHandle;
use tracing::info;

Expand Down Expand Up @@ -63,14 +64,13 @@ impl EthSubscriptionDatasource {
impl Datasource<RawEthData> for EthSubscriptionDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that

data_sender: DataSender<RawEthData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let filter = Filter::new()
.address(self.bridge_address)
.from_block(starting_checkpoint)
.to_block(target_checkpoint);
.from_block(task.start_checkpoint)
.to_block(task.target_checkpoint);

let eth_ws_url = self.eth_ws_url.clone();
let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
Expand Down Expand Up @@ -194,8 +194,7 @@ impl EthSyncDatasource {
impl Datasource<RawEthData> for EthSyncDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<RawEthData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let provider = Arc::new(
Expand All @@ -214,8 +213,8 @@ impl Datasource<RawEthData> for EthSyncDatasource {
let Ok(Ok(logs)) = retry_with_max_elapsed_time!(
client.get_raw_events_in_range(
bridge_address,
starting_checkpoint,
target_checkpoint
task.start_checkpoint,
task.target_checkpoint
),
Duration::from_secs(30000)
) else {
Expand Down Expand Up @@ -254,7 +253,7 @@ impl Datasource<RawEthData> for EthSyncDatasource {
data.push((log, block, transaction));
}

data_sender.send((target_checkpoint, data)).await?;
data_sender.send((task.target_checkpoint, data)).await?;

indexer_metrics
.last_synced_eth_block
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-bridge-indexer/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use diesel::data_types::PgTimestamp;
use diesel::{Identifiable, Insertable, Queryable, Selectable};

use sui_indexer_builder::Task;
use sui_indexer_builder::{Task, LIVE_TASK_TARGET_CHECKPOINT};

use crate::schema::{
progress_store, sui_error_transactions, sui_progress_store, token_transfer, token_transfer_data,
Expand All @@ -23,10 +23,11 @@ impl From<ProgressStore> for Task {
fn from(value: ProgressStore) -> Self {
Self {
task_name: value.task_name,
checkpoint: value.checkpoint as u64,
start_checkpoint: value.checkpoint as u64,
target_checkpoint: value.target_checkpoint as u64,
// Ok to unwrap, timestamp is defaulted to now() in database
timestamp: value.timestamp.expect("Timestamp not set").0 as u64,
is_live_task: value.target_checkpoint == LIVE_TASK_TARGET_CHECKPOINT,
}
}
}
Expand Down
31 changes: 27 additions & 4 deletions crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::schema::progress_store::{columns, dsl};
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
use crate::{models, schema, ProcessedTxnData};
use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent};
use sui_indexer_builder::Task;
use sui_indexer_builder::{Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT};

/// Persistent layer impl
#[derive(Clone)]
Expand Down Expand Up @@ -147,7 +147,7 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(None)
}

async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Tasks, anyhow::Error> {
let mut conn = self.pool.get().await?;
// get all unfinished tasks
let cp: Vec<models::ProgressStore> = dsl::progress_store
Expand All @@ -157,7 +157,8 @@ impl IndexerProgressStore for PgBridgePersistent {
.order_by(columns::target_checkpoint.desc())
.load(&mut conn)
.await?;
Ok(cp.into_iter().map(|d| d.into()).collect())
let tasks = cp.into_iter().map(|d| d.into()).collect();
Ok(Tasks::new(tasks)?)
}

async fn get_largest_backfill_task_target_checkpoint(
Expand All @@ -177,6 +178,8 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(cp.map(|c| c as u64))
}

/// Register a new task to progress store with a start checkpoint and target checkpoint.
/// Usually used for backfill tasks.
async fn register_task(
&mut self,
task_name: String,
Expand All @@ -197,11 +200,31 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(())
}

/// Register a live task to progress store with a start checkpoint.
async fn register_live_task(
&mut self,
task_name: String,
start_checkpoint: u64,
) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::insert_into(schema::progress_store::table)
.values(models::ProgressStore {
task_name,
checkpoint: start_checkpoint as i64,
target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT,
// Timestamp is defaulted to current time in DB if None
timestamp: None,
})
.execute(&mut conn)
.await?;
Ok(())
}

async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
let mut conn = self.pool.get().await?;
diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
.set((
columns::checkpoint.eq(task.checkpoint as i64),
columns::checkpoint.eq(task.start_checkpoint as i64),
columns::target_checkpoint.eq(task.target_checkpoint as i64),
columns::timestamp.eq(now),
))
Expand Down
37 changes: 26 additions & 11 deletions crates/sui-bridge-indexer/src/sui_datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use sui_data_ingestion_core::{
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
};
use sui_indexer_builder::indexer_builder::{DataSender, Datasource};
use sui_indexer_builder::Task;
use sui_sdk::SuiClient;
use sui_types::base_types::TransactionDigest;
use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
use sui_types::full_checkpoint_content::CheckpointTransaction;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
Expand All @@ -23,6 +23,9 @@ use tokio::task::JoinHandle;

use crate::metrics::BridgeIndexerMetrics;

const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this is the value that defines how much is going to be read in one shot?
Can you add a comment as to what this mean?
Also is this something that should be defined in the config file besides and env variable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the fetching concurrency in ingestion framework. https://github.com/MystenLabs/sui/blob/main/crates/sui-data-ingestion-core/src/reader.rs#L207
it guarantees at most N checkpoints can be fetched at the same time. A recent change downgraded the default value to 10, which significantly bottlenecked the backfill speed. In backfill tasks we should use a larger number, in live sync 10 is fine.

will add in the comment

const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10;

pub struct SuiCheckpointDatasource {
remote_store_url: String,
sui_client: Arc<SuiClient>,
Expand Down Expand Up @@ -58,23 +61,32 @@ impl SuiCheckpointDatasource {
impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<CheckpointTxnData>,
) -> Result<JoinHandle<Result<(), Error>>, Error> {
let (exit_sender, exit_receiver) = oneshot::channel();
let progress_store = PerTaskInMemProgressStore {
current_checkpoint: starting_checkpoint,
exit_checkpoint: target_checkpoint,
current_checkpoint: task.start_checkpoint,
exit_checkpoint: task.target_checkpoint,
exit_sender: Some(exit_sender),
};
// The max concurrnecy of checkpoint to fetch at the same time for ingestion framework
let ingestion_reader_batch_size = if task.is_live_task {
// Live task uses smaller number to be cost effective
LIVE_TASK_INGESTION_READER_BATCH_SIZE
} else {
std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE")
.unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string())
.parse::<usize>()
.unwrap()
};
tracing::info!(
"Starting Sui checkpoint data retrieval with batch size {}",
ingestion_reader_batch_size
);
let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone());
let worker = IndexerWorker::new(data_sender);
let worker_pool = WorkerPool::new(
worker,
TransactionDigest::random().to_string(),
self.concurrency,
);
let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency);
executor.register(worker_pool).await?;
let checkpoint_path = self.checkpoint_path.clone();
let remote_store_url = self.remote_store_url.clone();
Expand All @@ -84,7 +96,10 @@ impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
checkpoint_path,
Some(remote_store_url),
vec![], // optional remote store access options
ReaderOptions::default(),
ReaderOptions {
batch_size: ingestion_reader_batch_size,
..Default::default()
},
exit_receiver,
)
.await?;
Expand Down
46 changes: 22 additions & 24 deletions crates/sui-indexer-builder/src/indexer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ impl<P, D, M> Indexer<P, D, M> {
let live_task_future = match ongoing_tasks.live_task() {
Some(live_task) if !self.disable_live_task => {
let live_task_future = self.datasource.start_ingestion_task(
live_task.task_name.clone(),
live_task.checkpoint,
live_task.target_checkpoint,
live_task,
self.storage.clone(),
self.data_mapper.clone(),
);
Expand All @@ -127,20 +125,18 @@ impl<P, D, M> Indexer<P, D, M> {
_ => None,
};

let backfill_tasks = ongoing_tasks.backfill_tasks();
let backfill_tasks = ongoing_tasks.backfill_tasks_ordered_desc();
let storage_clone = self.storage.clone();
let data_mapper_clone = self.data_mapper.clone();
let datasource_clone = self.datasource.clone();

let handle = spawn_monitored_task!(async {
// Execute tasks one by one
for backfill_task in backfill_tasks {
if backfill_task.checkpoint < backfill_task.target_checkpoint {
if backfill_task.start_checkpoint < backfill_task.target_checkpoint {
datasource_clone
.start_ingestion_task(
backfill_task.task_name.clone(),
backfill_task.checkpoint,
backfill_task.target_checkpoint,
backfill_task,
storage_clone.clone(),
data_mapper_clone.clone(),
)
Expand Down Expand Up @@ -181,10 +177,9 @@ impl<P, D, M> Indexer<P, D, M> {
match ongoing_tasks.live_task() {
None => {
self.storage
.register_task(
.register_live_task(
format!("{} - Live", self.name),
live_task_from_checkpoint,
i64::MAX as u64,
)
.await
.tap_err(|e| {
Expand All @@ -199,8 +194,8 @@ impl<P, D, M> Indexer<P, D, M> {
// We still check this because in the case of slow
// block generation (e.g. Ethereum), it's possible we will
// stay on the same block for a bit.
if live_task_from_checkpoint != live_task.checkpoint {
live_task.checkpoint = live_task_from_checkpoint;
if live_task_from_checkpoint != live_task.start_checkpoint {
live_task.start_checkpoint = live_task_from_checkpoint;
self.storage.update_task(live_task).await.tap_err(|e| {
tracing::error!(
"Failed to update live task to ({}-MAX): {:?}",
Expand Down Expand Up @@ -318,7 +313,7 @@ pub trait IndexerProgressStore: Send {
target_checkpoint_number: u64,
) -> anyhow::Result<Option<u64>>;

async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Vec<Task>, Error>;
async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Tasks, Error>;

async fn get_largest_backfill_task_target_checkpoint(
&self,
Expand All @@ -328,27 +323,34 @@ pub trait IndexerProgressStore: Send {
async fn register_task(
&mut self,
task_name: String,
checkpoint: u64,
start_checkpoint: u64,
target_checkpoint: u64,
) -> Result<(), anyhow::Error>;

async fn register_live_task(
&mut self,
task_name: String,
start_checkpoint: u64,
) -> Result<(), anyhow::Error>;

async fn update_task(&mut self, task: Task) -> Result<(), Error>;
}

#[async_trait]
pub trait Datasource<T: Send>: Sync + Send {
async fn start_ingestion_task<M, P, R>(
&self,
task_name: String,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
mut storage: P,
data_mapper: M,
) -> Result<(), Error>
where
M: DataMapper<T, R>,
P: Persistent<R>,
{
let task_name = task.task_name.clone();
let starting_checkpoint = task.start_checkpoint;
let target_checkpoint = task.target_checkpoint;
let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE")
.unwrap_or(INGESTION_BATCH_SIZE.to_string())
.parse::<usize>()
Expand All @@ -365,18 +367,15 @@ pub trait Datasource<T: Send>: Sync + Send {
starting_checkpoint,
target_checkpoint,
);
let is_live_task = target_checkpoint == i64::MAX as u64;
let (data_sender, data_rx) = metered_channel::channel(
checkpoint_channel_size,
&mysten_metrics::get_metrics()
.unwrap()
.channel_inflight
.with_label_values(&[&task_name]),
);
let join_handle = self
.start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender)
.await?;

let is_live_task = task.is_live_task;
let join_handle = self.start_data_retrieval(task, data_sender).await?;
let processed_checkpoints_metrics = self
.get_tasks_processed_checkpoints_metric()
.with_label_values(&[&task_name]);
Expand Down Expand Up @@ -499,8 +498,7 @@ pub trait Datasource<T: Send>: Sync + Send {

async fn start_data_retrieval(
&self,
starting_checkpoint: u64,
target_checkpoint: u64,
task: Task,
data_sender: DataSender<T>,
) -> Result<JoinHandle<Result<(), Error>>, Error>;

Expand Down
Loading