File tree Expand file tree Collapse file tree 5 files changed +167
-258
lines changed Expand file tree Collapse file tree 5 files changed +167
-258
lines changed Original file line number Diff line number Diff line change @@ -51,7 +51,7 @@ fn indexer_benchmark(c: &mut Criterion) {
51
51
} ) ;
52
52
53
53
c. bench_function ( "persist_checkpoint" , |b| {
54
- b. iter ( || rt . block_on ( store. persist_all_checkpoint_data ( & checkpoints. pop ( ) . unwrap ( ) ) ) )
54
+ b. iter ( || store. persist_all_checkpoint_data ( & checkpoints. pop ( ) . unwrap ( ) ) )
55
55
} ) ;
56
56
57
57
let mut checkpoints = ( 20 ..100 ) . cycle ( ) . map ( CheckpointId :: SequenceNumber ) ;
Original file line number Diff line number Diff line change @@ -365,14 +365,15 @@ where
365
365
// otherwise send it to channel to be committed later.
366
366
if epoch. last_epoch . is_none ( ) {
367
367
let epoch_db_guard = self . metrics . epoch_db_commit_latency . start_timer ( ) ;
368
+ info ! ( "Persisting first epoch..." ) ;
368
369
let mut persist_first_epoch_res = self . state . persist_epoch ( & epoch) . await ;
369
370
while persist_first_epoch_res. is_err ( ) {
370
371
warn ! ( "Failed to persist first epoch, retrying..." ) ;
371
372
persist_first_epoch_res = self . state . persist_epoch ( & epoch) . await ;
372
373
}
373
- self . state . persist_epoch ( & epoch) . await ?;
374
374
epoch_db_guard. stop_and_record ( ) ;
375
375
self . metrics . total_epoch_committed . inc ( ) ;
376
+ info ! ( "Persisted first epoch" ) ;
376
377
} else {
377
378
let epoch_sender_guard = self . epoch_sender . lock ( ) . await ;
378
379
// NOTE: when the channel is full, epoch_sender_guard will wait until the channel has space.
Original file line number Diff line number Diff line change @@ -189,7 +189,7 @@ pub trait IndexerStore {
189
189
) -> Result < usize , IndexerError > ;
190
190
// TODO(gegaowp): keep this method in this trait for now for easier reverting,
191
191
// will remove it if it's no longer needed.
192
- async fn persist_all_checkpoint_data (
192
+ fn persist_all_checkpoint_data (
193
193
& self ,
194
194
data : & TemporaryCheckpointStore ,
195
195
) -> Result < usize , IndexerError > ;
Original file line number Diff line number Diff line change @@ -21,31 +21,6 @@ mod diesel_marco {
21
21
} } ;
22
22
}
23
23
24
- macro_rules! read_only {
25
- ( $pool: expr, $query: expr) => { {
26
- let mut pg_pool_conn = crate :: get_async_pg_pool_connection( $pool) . await ?;
27
- pg_pool_conn
28
- . build_transaction( )
29
- . read_only( )
30
- . run( $query)
31
- . await
32
- . map_err( |e| IndexerError :: PostgresReadError ( e. to_string( ) ) )
33
- } } ;
34
- }
35
-
36
- macro_rules! transactional {
37
- ( $pool: expr, $query: expr) => { {
38
- let mut pg_pool_conn = crate :: get_async_pg_pool_connection( $pool) . await ?;
39
- pg_pool_conn
40
- . build_transaction( )
41
- . serializable( )
42
- . read_write( )
43
- . run( $query)
44
- . await
45
- . map_err( |e| IndexerError :: PostgresWriteError ( e. to_string( ) ) )
46
- } } ;
47
- }
48
-
49
24
macro_rules! transactional_blocking {
50
25
( $pool: expr, $query: expr) => { {
51
26
let mut pg_pool_conn = crate :: get_pg_pool_connection( $pool) ?;
@@ -57,8 +32,6 @@ mod diesel_marco {
57
32
. map_err( |e| IndexerError :: PostgresWriteError ( e. to_string( ) ) )
58
33
} } ;
59
34
}
60
- pub ( crate ) use read_only;
61
35
pub ( crate ) use read_only_blocking;
62
- pub ( crate ) use transactional;
63
36
pub ( crate ) use transactional_blocking;
64
37
}
You can’t perform that action at this time.
0 commit comments