-
Notifications
You must be signed in to change notification settings - Fork 11.7k
[bridge-indexer] revamp task #19245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[bridge-indexer] revamp task #19245
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -12,8 +12,8 @@ use sui_data_ingestion_core::{ | |
| DataIngestionMetrics, IndexerExecutor, ProgressStore, ReaderOptions, Worker, WorkerPool, | ||
| }; | ||
| use sui_indexer_builder::indexer_builder::{DataSender, Datasource}; | ||
| use sui_indexer_builder::Task; | ||
| use sui_sdk::SuiClient; | ||
| use sui_types::base_types::TransactionDigest; | ||
| use sui_types::full_checkpoint_content::CheckpointData as SuiCheckpointData; | ||
| use sui_types::full_checkpoint_content::CheckpointTransaction; | ||
| use sui_types::messages_checkpoint::CheckpointSequenceNumber; | ||
|
|
@@ -23,6 +23,9 @@ use tokio::task::JoinHandle; | |
|
|
||
| use crate::metrics::BridgeIndexerMetrics; | ||
|
|
||
| const BACKFILL_TASK_INGESTION_READER_BATCH_SIZE: usize = 300; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this is the value that defines how much is going to be read in one shot?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the fetching concurrency in ingestion framework. https://github.com/MystenLabs/sui/blob/main/crates/sui-data-ingestion-core/src/reader.rs#L207 will add in the comment |
||
| const LIVE_TASK_INGESTION_READER_BATCH_SIZE: usize = 10; | ||
|
|
||
| pub struct SuiCheckpointDatasource { | ||
| remote_store_url: String, | ||
| sui_client: Arc<SuiClient>, | ||
|
|
@@ -58,23 +61,32 @@ impl SuiCheckpointDatasource { | |
| impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource { | ||
| async fn start_data_retrieval( | ||
| &self, | ||
| starting_checkpoint: u64, | ||
| target_checkpoint: u64, | ||
| task: Task, | ||
| data_sender: DataSender<CheckpointTxnData>, | ||
| ) -> Result<JoinHandle<Result<(), Error>>, Error> { | ||
| let (exit_sender, exit_receiver) = oneshot::channel(); | ||
| let progress_store = PerTaskInMemProgressStore { | ||
| current_checkpoint: starting_checkpoint, | ||
| exit_checkpoint: target_checkpoint, | ||
| current_checkpoint: task.start_checkpoint, | ||
| exit_checkpoint: task.target_checkpoint, | ||
| exit_sender: Some(exit_sender), | ||
| }; | ||
| // The max concurrnecy of checkpoint to fetch at the same time for ingestion framework | ||
| let ingestion_reader_batch_size = if task.is_live_task { | ||
| // Live task uses smaller number to be cost effective | ||
| LIVE_TASK_INGESTION_READER_BATCH_SIZE | ||
| } else { | ||
| std::env::var("BACKFILL_TASK_INGESTION_READER_BATCH_SIZE") | ||
| .unwrap_or(BACKFILL_TASK_INGESTION_READER_BATCH_SIZE.to_string()) | ||
| .parse::<usize>() | ||
| .unwrap() | ||
| }; | ||
| tracing::info!( | ||
| "Starting Sui checkpoint data retrieval with batch size {}", | ||
| ingestion_reader_batch_size | ||
| ); | ||
| let mut executor = IndexerExecutor::new(progress_store, 1, self.metrics.clone()); | ||
| let worker = IndexerWorker::new(data_sender); | ||
| let worker_pool = WorkerPool::new( | ||
| worker, | ||
| TransactionDigest::random().to_string(), | ||
| self.concurrency, | ||
| ); | ||
| let worker_pool = WorkerPool::new(worker, task.task_name.clone(), self.concurrency); | ||
| executor.register(worker_pool).await?; | ||
| let checkpoint_path = self.checkpoint_path.clone(); | ||
| let remote_store_url = self.remote_store_url.clone(); | ||
|
|
@@ -84,7 +96,10 @@ impl Datasource<CheckpointTxnData> for SuiCheckpointDatasource { | |
| checkpoint_path, | ||
| Some(remote_store_url), | ||
| vec![], // optional remote store access options | ||
| ReaderOptions::default(), | ||
| ReaderOptions { | ||
| batch_size: ingestion_reader_batch_size, | ||
| ..Default::default() | ||
| }, | ||
| exit_receiver, | ||
| ) | ||
| .await?; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that