@@ -14,12 +14,12 @@ use ingestion::IngestionService;
1414use ingestion:: ingestion_client:: IngestionClient ;
1515use metrics:: IndexerMetrics ;
1616use prometheus:: Registry ;
17- use sui_indexer_alt_framework_store_traits:: Connection ;
18- use sui_indexer_alt_framework_store_traits:: InitWatermark ;
1917use sui_indexer_alt_framework_store_traits:: Store ;
2018use sui_indexer_alt_framework_store_traits:: TrailingConnection ;
19+ use sui_indexer_alt_framework_store_traits:: TrailingInitWatermark ;
2120use sui_indexer_alt_framework_store_traits:: TransactionalStore ;
2221use sui_indexer_alt_framework_store_traits:: pipeline_task;
22+ use sui_indexer_alt_framework_store_traits:: { Connection , InitWatermark } ;
2323use tracing:: info;
2424
2525use crate :: metrics:: IngestionMetrics ;
@@ -294,7 +294,13 @@ impl<S: Store> Indexer<S> {
294294 where
295295 H : concurrent:: Handler < Store = S > + Send + Sync + ' static ,
296296 {
297- let Some ( next_checkpoint) = self . add_pipeline :: < H > ( ) . await ? else {
297+ let default_next_checkpoint = self . default_next_checkpoint ;
298+ let Some ( next_checkpoint) = self
299+ . add_pipeline :: < H > ( async |pt, conn| {
300+ Self :: next_checkpoint ( default_next_checkpoint, pt, conn) . await
301+ } )
302+ . await ?
303+ else {
298304 return Ok ( ( ) ) ;
299305 } ;
300306
@@ -345,6 +351,26 @@ impl<S: Store> Indexer<S> {
345351 Ok ( service)
346352 }
347353
354+ fn next_checkpoint < ' c > (
355+ default_next_checkpoint : u64 ,
356+ pipeline_task : String ,
357+ mut conn : S :: Connection < ' c > ,
358+ ) -> impl Future < Output = anyhow:: Result < u64 > > + ' c {
359+ async move {
360+ let InitWatermark {
361+ checkpoint_hi_inclusive,
362+ } = conn
363+ . init_watermark (
364+ & pipeline_task,
365+ InitWatermark {
366+ checkpoint_hi_inclusive : default_next_checkpoint. checked_sub ( 1 ) ,
367+ } ,
368+ )
369+ . await ?;
370+ Ok ( checkpoint_hi_inclusive. map_or ( 0 , |c| c + 1 ) )
371+ }
372+ }
373+
348374 /// Determine the checkpoint for the pipeline to resume processing from. This is either the
349375 /// checkpoint after its watermark, or if that doesn't exist, then the provided
350376 /// [Self::first_checkpoint], and if that is not set, then 0 (genesis).
@@ -353,7 +379,10 @@ impl<S: Store> Indexer<S> {
353379 /// calculated above.
354380 ///
355381 /// Returns `Ok(None)` if the pipeline is disabled.
356- async fn add_pipeline < P : Processor + ' static > ( & mut self ) -> Result < Option < u64 > > {
382+ async fn add_pipeline < P : Processor + ' static > (
383+ & mut self ,
384+ next_checkpoint_fn : impl for < ' c > AsyncFnOnce ( String , S :: Connection < ' c > ) -> anyhow:: Result < u64 > ,
385+ ) -> Result < Option < u64 > > {
357386 ensure ! (
358387 self . added_pipelines. insert( P :: NAME ) ,
359388 "Pipeline {:?} already added" ,
@@ -367,7 +396,7 @@ impl<S: Store> Indexer<S> {
367396 return Ok ( None ) ;
368397 }
369398
370- let mut conn = self
399+ let conn = self
371400 . store
372401 . connect ( )
373402 . await
@@ -376,21 +405,9 @@ impl<S: Store> Indexer<S> {
376405 let pipeline_task =
377406 pipeline_task :: < S > ( P :: NAME , self . task . as_ref ( ) . map ( |t| t. task . as_str ( ) ) ) ?;
378407
379- let InitWatermark {
380- checkpoint_hi_inclusive,
381- reader_lo,
382- } = conn
383- . init_watermark (
384- & pipeline_task,
385- InitWatermark {
386- checkpoint_hi_inclusive : self . default_next_checkpoint . checked_sub ( 1 ) ,
387- reader_lo : self . default_next_checkpoint ,
388- } ,
389- )
408+ let next_checkpoint = next_checkpoint_fn ( pipeline_task. clone ( ) , conn)
390409 . await
391- . with_context ( || format ! ( "Failed to init watermark for {pipeline_task}" ) ) ?;
392-
393- let next_checkpoint = checkpoint_hi_inclusive. map_or ( reader_lo, |c| c + 1 ) ;
410+ . with_context ( || format ! ( "Failed to calculate next_checkpoint for {pipeline_task}" ) ) ?;
394411
395412 self . first_ingestion_checkpoint = next_checkpoint. min ( self . first_ingestion_checkpoint ) ;
396413
@@ -412,7 +429,25 @@ where
412429 where
413430 H : concurrent:: Handler < Store = S > + Send + Sync + ' static ,
414431 {
415- let Some ( next_checkpoint) = self . add_pipeline :: < H > ( ) . await ? else {
432+ let default_next_checkpoint = self . default_next_checkpoint ;
433+ let Some ( next_checkpoint) = self
434+ . add_pipeline :: < H > ( async |pipeline_task, mut conn| {
435+ let TrailingInitWatermark {
436+ checkpoint_hi_inclusive,
437+ reader_lo,
438+ } = conn
439+ . trailing_init_watermark (
440+ & pipeline_task,
441+ TrailingInitWatermark {
442+ checkpoint_hi_inclusive : default_next_checkpoint. checked_sub ( 1 ) ,
443+ reader_lo : default_next_checkpoint,
444+ } ,
445+ )
446+ . await ?;
447+ Ok ( checkpoint_hi_inclusive. map_or ( reader_lo, |c| c + 1 ) )
448+ } )
449+ . await ?
450+ else {
416451 return Ok ( ( ) ) ;
417452 } ;
418453
@@ -449,10 +484,6 @@ impl<T: TransactionalStore> Indexer<T> {
449484 where
450485 H : sequential:: Handler < Store = T > + Send + Sync + ' static ,
451486 {
452- let Some ( next_checkpoint) = self . add_pipeline :: < H > ( ) . await ? else {
453- return Ok ( ( ) ) ;
454- } ;
455-
456487 if self . task . is_some ( ) {
457488 bail ! (
458489 "Sequential pipelines do not support pipeline tasks. \
@@ -461,6 +492,16 @@ impl<T: TransactionalStore> Indexer<T> {
461492 ) ;
462493 }
463494
495+ let default_next_checkpoint = self . default_next_checkpoint ;
496+ let Some ( next_checkpoint) = self
497+ . add_pipeline :: < H > ( async |pt, conn| {
498+ Self :: next_checkpoint ( default_next_checkpoint, pt, conn) . await
499+ } )
500+ . await ?
501+ else {
502+ return Ok ( ( ) ) ;
503+ } ;
504+
464505 // Track the minimum next_checkpoint across all sequential pipelines
465506 self . next_sequential_checkpoint = Some (
466507 self . next_sequential_checkpoint
@@ -740,7 +781,7 @@ mod tests {
740781
741782 let mut conn = store. connect ( ) . await . unwrap ( ) ;
742783
743- conn. init_watermark ( A :: NAME , InitWatermark :: default ( ) )
784+ conn. trailing_init_watermark ( A :: NAME , TrailingInitWatermark :: default ( ) )
744785 . await
745786 . unwrap ( ) ;
746787 conn. set_committer_watermark (
@@ -753,7 +794,7 @@ mod tests {
753794 . await
754795 . unwrap ( ) ;
755796
756- conn. init_watermark ( B :: NAME , InitWatermark :: default ( ) )
797+ conn. trailing_init_watermark ( B :: NAME , TrailingInitWatermark :: default ( ) )
757798 . await
758799 . unwrap ( ) ;
759800 conn. set_committer_watermark (
@@ -766,7 +807,7 @@ mod tests {
766807 . await
767808 . unwrap ( ) ;
768809
769- conn. init_watermark ( C :: NAME , InitWatermark :: default ( ) )
810+ conn. trailing_init_watermark ( C :: NAME , TrailingInitWatermark :: default ( ) )
770811 . await
771812 . unwrap ( ) ;
772813 conn. set_committer_watermark (
@@ -779,7 +820,7 @@ mod tests {
779820 . await
780821 . unwrap ( ) ;
781822
782- conn. init_watermark ( D :: NAME , InitWatermark :: default ( ) )
823+ conn. trailing_init_watermark ( D :: NAME , TrailingInitWatermark :: default ( ) )
783824 . await
784825 . unwrap ( ) ;
785826 conn. set_committer_watermark (
0 commit comments