Skip to content

Commit dd951f8

Browse files
authored
[bridge-indexer] revamp task (#19245)
## Description This PR reworks `Tasks`: 1. get rid of trait `Tasks` and create struct `Tasks` instead. 2. add `is_live_task` field to `Task` 3. pass `Task` to several functions instead of its parameters. 4. for ingestion framework, use a custom batch read size for backfill tasks (this significantly improves the data download speed) ## Test plan How did you test the new or updated feature? --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API:
1 parent 2616286 commit dd951f8

File tree

8 files changed

+175
-92
lines changed

8 files changed

+175
-92
lines changed

crates/sui-bridge-indexer/src/eth_bridge_indexer.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use sui_bridge::error::BridgeError;
1616
use sui_bridge::eth_client::EthClient;
1717
use sui_bridge::metered_eth_provider::MeteredEthHttpProvier;
1818
use sui_bridge::retry_with_max_elapsed_time;
19+
use sui_indexer_builder::Task;
1920
use tokio::task::JoinHandle;
2021
use tracing::info;
2122

@@ -63,14 +64,13 @@ impl EthSubscriptionDatasource {
6364
impl Datasource<RawEthData> for EthSubscriptionDatasource {
6465
async fn start_data_retrieval(
6566
&self,
66-
starting_checkpoint: u64,
67-
target_checkpoint: u64,
67+
task: Task,
6868
data_sender: DataSender<RawEthData>,
6969
) -> Result<JoinHandle<Result<(), Error>>, Error> {
7070
let filter = Filter::new()
7171
.address(self.bridge_address)
72-
.from_block(starting_checkpoint)
73-
.to_block(target_checkpoint);
72+
.from_block(task.start_checkpoint)
73+
.to_block(task.target_checkpoint);
7474

7575
let eth_ws_url = self.eth_ws_url.clone();
7676
let indexer_metrics: BridgeIndexerMetrics = self.indexer_metrics.clone();
@@ -194,8 +194,7 @@ impl EthSyncDatasource {
194194
impl Datasource<RawEthData> for EthSyncDatasource {
195195
async fn start_data_retrieval(
196196
&self,
197-
starting_checkpoint: u64,
198-
target_checkpoint: u64,
197+
task: Task,
199198
data_sender: DataSender<RawEthData>,
200199
) -> Result<JoinHandle<Result<(), Error>>, Error> {
201200
let provider = Arc::new(
@@ -214,8 +213,8 @@ impl Datasource<RawEthData> for EthSyncDatasource {
214213
let Ok(Ok(logs)) = retry_with_max_elapsed_time!(
215214
client.get_raw_events_in_range(
216215
bridge_address,
217-
starting_checkpoint,
218-
target_checkpoint
216+
task.start_checkpoint,
217+
task.target_checkpoint
219218
),
220219
Duration::from_secs(30000)
221220
) else {
@@ -254,7 +253,7 @@ impl Datasource<RawEthData> for EthSyncDatasource {
254253
data.push((log, block, transaction));
255254
}
256255

257-
data_sender.send((target_checkpoint, data)).await?;
256+
data_sender.send((task.target_checkpoint, data)).await?;
258257

259258
indexer_metrics
260259
.last_synced_eth_block

crates/sui-bridge-indexer/src/models.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use diesel::data_types::PgTimestamp;
55
use diesel::{Identifiable, Insertable, Queryable, Selectable};
66

7-
use sui_indexer_builder::Task;
7+
use sui_indexer_builder::{Task, LIVE_TASK_TARGET_CHECKPOINT};
88

99
use crate::schema::{
1010
progress_store, sui_error_transactions, sui_progress_store, token_transfer, token_transfer_data,
@@ -23,10 +23,11 @@ impl From<ProgressStore> for Task {
2323
fn from(value: ProgressStore) -> Self {
2424
Self {
2525
task_name: value.task_name,
26-
checkpoint: value.checkpoint as u64,
26+
start_checkpoint: value.checkpoint as u64,
2727
target_checkpoint: value.target_checkpoint as u64,
2828
// Ok to unwrap, timestamp is defaulted to now() in database
2929
timestamp: value.timestamp.expect("Timestamp not set").0 as u64,
30+
is_live_task: value.target_checkpoint == LIVE_TASK_TARGET_CHECKPOINT,
3031
}
3132
}
3233
}

crates/sui-bridge-indexer/src/storage.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use crate::schema::progress_store::{columns, dsl};
2121
use crate::schema::{sui_error_transactions, token_transfer, token_transfer_data};
2222
use crate::{models, schema, ProcessedTxnData};
2323
use sui_indexer_builder::indexer_builder::{IndexerProgressStore, Persistent};
24-
use sui_indexer_builder::Task;
24+
use sui_indexer_builder::{Task, Tasks, LIVE_TASK_TARGET_CHECKPOINT};
2525

2626
/// Persistent layer impl
2727
#[derive(Clone)]
@@ -147,7 +147,7 @@ impl IndexerProgressStore for PgBridgePersistent {
147147
Ok(None)
148148
}
149149

150-
async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Vec<Task>, anyhow::Error> {
150+
async fn get_ongoing_tasks(&self, prefix: &str) -> Result<Tasks, anyhow::Error> {
151151
let mut conn = self.pool.get().await?;
152152
// get all unfinished tasks
153153
let cp: Vec<models::ProgressStore> = dsl::progress_store
@@ -157,7 +157,8 @@ impl IndexerProgressStore for PgBridgePersistent {
157157
.order_by(columns::target_checkpoint.desc())
158158
.load(&mut conn)
159159
.await?;
160-
Ok(cp.into_iter().map(|d| d.into()).collect())
160+
let tasks = cp.into_iter().map(|d| d.into()).collect();
161+
Ok(Tasks::new(tasks)?)
161162
}
162163

163164
async fn get_largest_backfill_task_target_checkpoint(
@@ -177,6 +178,8 @@ impl IndexerProgressStore for PgBridgePersistent {
177178
Ok(cp.map(|c| c as u64))
178179
}
179180

181+
/// Register a new task to progress store with a start checkpoint and target checkpoint.
182+
/// Usually used for backfill tasks.
180183
async fn register_task(
181184
&mut self,
182185
task_name: String,
@@ -197,11 +200,31 @@ impl IndexerProgressStore for PgBridgePersistent {
197200
Ok(())
198201
}
199202

203+
/// Register a live task to progress store with a start checkpoint.
204+
async fn register_live_task(
205+
&mut self,
206+
task_name: String,
207+
start_checkpoint: u64,
208+
) -> Result<(), anyhow::Error> {
209+
let mut conn = self.pool.get().await?;
210+
diesel::insert_into(schema::progress_store::table)
211+
.values(models::ProgressStore {
212+
task_name,
213+
checkpoint: start_checkpoint as i64,
214+
target_checkpoint: LIVE_TASK_TARGET_CHECKPOINT,
215+
// Timestamp is defaulted to current time in DB if None
216+
timestamp: None,
217+
})
218+
.execute(&mut conn)
219+
.await?;
220+
Ok(())
221+
}
222+
200223
async fn update_task(&mut self, task: Task) -> Result<(), anyhow::Error> {
201224
let mut conn = self.pool.get().await?;
202225
diesel::update(dsl::progress_store.filter(columns::task_name.eq(task.task_name)))
203226
.set((
204-
columns::checkpoint.eq(task.checkpoint as i64),
227+
columns::checkpoint.eq(task.start_checkpoint as i64),
205228
columns::target_checkpoint.eq(task.target_checkpoint as i64),
206229
columns::timestamp.eq(now),
207230
))

crates/sui-bridge-indexer/src/sui_datasource.rs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ use sui_data_ingestion_core::{
1212
DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool,
1313
};
1414
use sui_indexer_builder::indexer_builder::{DataSender, Datasource};
15+
use sui_indexer_builder::Task;
1516
use sui_sdk::SuiClient;
16-
use sui_types::base_types::TransactionDigest;
1717
use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData;
1818
use sui_types::full_checkpoint_content::CheckpointTransaction;
1919
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
@@ -23,6 +23,9 @@ use tokio::task::JoinHandle;
2323

2424
use crate::metrics::BridgeIndexerMetrics;
2525

26+
const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300;
27+
const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10;
28+
2629
pub struct SuiCheckpointDatasource {
2730
remote_store_url: String,
2831
sui_client: Arc<SuiClient>,
@@ -58,23 +61,32 @@ impl SuiCheckpointDatasource {
5861
impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
5962
async fn start_data_retrieval(
6063
&self,
61-
starting_checkpoint: u64,
62-
target_checkpoint: u64,
64+
task: Task,
6365
data_sender: DataSender<CheckpointTxnData>,
6466
) -> Result<JoinHandle<Result<(), Error>>, Error> {
6567
let (exit_sender, exit_receiver) = oneshot::channel();
6668
let progress_store = PerTaskInMemProgressStore {
67-
current_checkpoint: starting_checkpoint,
68-
exit_checkpoint: target_checkpoint,
69+
current_checkpoint: task.start_checkpoint,
70+
exit_checkpoint: task.target_checkpoint,
6971
exit_sender: Some(exit_sender),
7072
};
73+
// The max concurrnecy of checkpoint to fetch at the same time for ingestion framework
74+
let ingestion_reader_batch_size = if task.is_live_task {
75+
// Live task uses smaller number to be cost effective
76+
LIVE_TASK_INGESTION_READER_BATCH_SIZE
77+
} else {
78+
std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE")
79+
.unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string())
80+
.parse::<usize>()
81+
.unwrap()
82+
};
83+
tracing::info!(
84+
"Starting Sui checkpoint data retrieval with batch size {}",
85+
ingestion_reader_batch_size
86+
);
7187
let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone());
7288
let worker = IndexerWorker::new(data_sender);
73-
let worker_pool = WorkerPool::new(
74-
worker,
75-
TransactionDigest::random().to_string(),
76-
self.concurrency,
77-
);
89+
let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency);
7890
executor.register(worker_pool).await?;
7991
let checkpoint_path = self.checkpoint_path.clone();
8092
let remote_store_url = self.remote_store_url.clone();
@@ -84,7 +96,10 @@ impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource {
8496
checkpoint_path,
8597
Some(remote_store_url),
8698
vec![], // optional remote store access options
87-
ReaderOptions::default(),
99+
ReaderOptions {
100+
batch_size: ingestion_reader_batch_size,
101+
..Default::default()
102+
},
88103
exit_receiver,
89104
)
90105
.await?;

crates/sui-indexer-builder/src/indexer_builder.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,7 @@ impl<P, D, M> Indexer<P, D, M> {
116116
let live_task_future = match ongoing_tasks.live_task() {
117117
Some(live_task) if !self.disable_live_task => {
118118
let live_task_future = self.datasource.start_ingestion_task(
119-
live_task.task_name.clone(),
120-
live_task.checkpoint,
121-
live_task.target_checkpoint,
119+
live_task,
122120
self.storage.clone(),
123121
self.data_mapper.clone(),
124122
);
@@ -127,20 +125,18 @@ impl<P, D, M> Indexer<P, D, M> {
127125
_ => None,
128126
};
129127

130-
let backfill_tasks = ongoing_tasks.backfill_tasks();
128+
let backfill_tasks = ongoing_tasks.backfill_tasks_ordered_desc();
131129
let storage_clone = self.storage.clone();
132130
let data_mapper_clone = self.data_mapper.clone();
133131
let datasource_clone = self.datasource.clone();
134132

135133
let handle = spawn_monitored_task!(async {
136134
// Execute tasks one by one
137135
for backfill_task in backfill_tasks {
138-
if backfill_task.checkpoint < backfill_task.target_checkpoint {
136+
if backfill_task.start_checkpoint < backfill_task.target_checkpoint {
139137
datasource_clone
140138
.start_ingestion_task(
141-
backfill_task.task_name.clone(),
142-
backfill_task.checkpoint,
143-
backfill_task.target_checkpoint,
139+
backfill_task,
144140
storage_clone.clone(),
145141
data_mapper_clone.clone(),
146142
)
@@ -181,10 +177,9 @@ impl<P, D, M> Indexer<P, D, M> {
181177
match ongoing_tasks.live_task() {
182178
None => {
183179
self.storage
184-
.register_task(
180+
.register_live_task(
185181
format!("{} - Live", self.name),
186182
live_task_from_checkpoint,
187-
i64::MAX as u64,
188183
)
189184
.await
190185
.tap_err(|e| {
@@ -199,8 +194,8 @@ impl<P, D, M> Indexer<P, D, M> {
199194
// We still check this because in the case of slow
200195
// block generation (e.g. Ethereum), it's possible we will
201196
// stay on the same block for a bit.
202-
if live_task_from_checkpoint != live_task.checkpoint {
203-
live_task.checkpoint = live_task_from_checkpoint;
197+
if live_task_from_checkpoint != live_task.start_checkpoint {
198+
live_task.start_checkpoint = live_task_from_checkpoint;
204199
self.storage.update_task(live_task).await.tap_err(|e| {
205200
tracing::error!(
206201
"Failed to update live task to ({}-MAX): {:?}",
@@ -318,7 +313,7 @@ pub trait IndexerProgressStore: Send {
318313
target_checkpoint_number: u64,
319314
) -> anyhow::Result<Option<u64>>;
320315

321-
async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Vec<Task>, Error>;
316+
async fn get_ongoing_tasks(&self, task_prefix: &str) -> Result<Tasks, Error>;
322317

323318
async fn get_largest_backfill_task_target_checkpoint(
324319
&self,
@@ -328,27 +323,34 @@ pub trait IndexerProgressStore: Send {
328323
async fn register_task(
329324
&mut self,
330325
task_name: String,
331-
checkpoint: u64,
326+
start_checkpoint: u64,
332327
target_checkpoint: u64,
333328
) -> Result<(), anyhow::Error>;
334329

330+
async fn register_live_task(
331+
&mut self,
332+
task_name: String,
333+
start_checkpoint: u64,
334+
) -> Result<(), anyhow::Error>;
335+
335336
async fn update_task(&mut self, task: Task) -> Result<(), Error>;
336337
}
337338

338339
#[async_trait]
339340
pub trait Datasource<T: Send>: Sync + Send {
340341
async fn start_ingestion_task<M, P, R>(
341342
&self,
342-
task_name: String,
343-
starting_checkpoint: u64,
344-
target_checkpoint: u64,
343+
task: Task,
345344
mut storage: P,
346345
data_mapper: M,
347346
) -> Result<(), Error>
348347
where
349348
M: DataMapper<T, R>,
350349
P: Persistent<R>,
351350
{
351+
let task_name = task.task_name.clone();
352+
let starting_checkpoint = task.start_checkpoint;
353+
let target_checkpoint = task.target_checkpoint;
352354
let ingestion_batch_size = std::env::var("INGESTION_BATCH_SIZE")
353355
.unwrap_or(INGESTION_BATCH_SIZE.to_string())
354356
.parse::<usize>()
@@ -365,18 +367,15 @@ pub trait Datasource<T: Send>: Sync + Send {
365367
starting_checkpoint,
366368
target_checkpoint,
367369
);
368-
let is_live_task = target_checkpoint == i64::MAX as u64;
369370
let (data_sender, data_rx) = metered_channel::channel(
370371
checkpoint_channel_size,
371372
&mysten_metrics::get_metrics()
372373
.unwrap()
373374
.channel_inflight
374375
.with_label_values(&[&task_name]),
375376
);
376-
let join_handle = self
377-
.start_data_retrieval(starting_checkpoint, target_checkpoint, data_sender)
378-
.await?;
379-
377+
let is_live_task = task.is_live_task;
378+
let join_handle = self.start_data_retrieval(task, data_sender).await?;
380379
let processed_checkpoints_metrics = self
381380
.get_tasks_processed_checkpoints_metric()
382381
.with_label_values(&[&task_name]);
@@ -499,8 +498,7 @@ pub trait Datasource<T: Send>: Sync + Send {
499498

500499
async fn start_data_retrieval(
501500
&self,
502-
starting_checkpoint: u64,
503-
target_checkpoint: u64,
501+
task: Task,
504502
data_sender: DataSender<T>,
505503
) -> Result<JoinHandle<Result<(), Error>>, Error>;
506504

0 commit comments

Comments
 (0)