@@ -614,6 +614,133 @@ mod tests {
614614 }
615615 }
616616
617+ #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
618+ async fn observable_counter_delta_attribute_set_reappears_after_gap ( ) {
619+ // Run this test with stdout enabled to see output.
620+ // cargo test observable_counter_delta_attribute_set_reappears_after_gap --features=testing -- --nocapture
621+
622+ // This test verifies the behavior when an attribute set is not reported
623+ // for one collection cycle and then reappears.
624+ // See: https://github.com/open-telemetry/opentelemetry-specification/issues/4861
625+ //
626+ // Scenario (Observable Counter with Delta temporality):
627+ // | Collection | Callback Reports | Expected Delta Export |
628+ // |------------|-------------------|------------------------|
629+ // | 1 | A=100, B=50 | A=100, B=50 |
630+ // | 2 | A=150 (B missing) | A=50 (B not exported) |
631+ // | 3 | A=200, B=80 | A=50, B=80 |
632+ //
633+ // Current implementation: When B reappears, its delta is calculated from zero
634+ // (fresh start), not from the last known value. This is Option 1 from the spec issue.
635+
636+ let mut test_context = TestContext :: new ( Temporality :: Delta ) ;
637+
638+ // Shared state for callback: (collection_cycle, value_a, value_b_option)
639+ // value_b_option is None when B should not be reported
640+ let callback_state = Arc :: new ( Mutex :: new ( ( 0u32 , 0u64 , Option :: < u64 > :: None ) ) ) ;
641+ let callback_state_clone = callback_state. clone ( ) ;
642+
643+ let _observable_counter = test_context
644+ . meter ( )
645+ . u64_observable_counter ( "my_observable_counter" )
646+ . with_callback ( move |observer| {
647+ let state = callback_state_clone. lock ( ) . unwrap ( ) ;
648+ let ( _cycle, value_a, value_b_option) = * state;
649+
650+ observer. observe ( value_a, & [ KeyValue :: new ( "key" , "A" ) ] ) ;
651+ if let Some ( value_b) = value_b_option {
652+ observer. observe ( value_b, & [ KeyValue :: new ( "key" , "B" ) ] ) ;
653+ }
654+ } )
655+ . build ( ) ;
656+
657+ // Collection 1: A=100, B=50
658+ {
659+ * callback_state. lock ( ) . unwrap ( ) = ( 1 , 100 , Some ( 50 ) ) ;
660+ test_context. flush_metrics ( ) ;
661+
662+ let MetricData :: Sum ( sum) =
663+ test_context. get_aggregation :: < u64 > ( "my_observable_counter" , None )
664+ else {
665+ unreachable ! ( )
666+ } ;
667+
668+ assert_eq ! ( sum. data_points. len( ) , 2 ) ;
669+ assert_eq ! ( sum. temporality, Temporality :: Delta ) ;
670+
671+ let dp_a = find_sum_datapoint_with_key_value ( & sum. data_points , "key" , "A" )
672+ . expect ( "datapoint for A expected" ) ;
673+ let dp_b = find_sum_datapoint_with_key_value ( & sum. data_points , "key" , "B" )
674+ . expect ( "datapoint for B expected" ) ;
675+
676+ // First collection: delta = value - 0
677+ assert_eq ! (
678+ dp_a. value, 100 ,
679+ "A's delta should be 100 (first collection)"
680+ ) ;
681+ assert_eq ! ( dp_b. value, 50 , "B's delta should be 50 (first collection)" ) ;
682+
683+ test_context. reset_metrics ( ) ;
684+ }
685+
686+ // Collection 2: A=150, B missing
687+ {
688+ * callback_state. lock ( ) . unwrap ( ) = ( 2 , 150 , None ) ;
689+ test_context. flush_metrics ( ) ;
690+
691+ let MetricData :: Sum ( sum) =
692+ test_context. get_aggregation :: < u64 > ( "my_observable_counter" , None )
693+ else {
694+ unreachable ! ( )
695+ } ;
696+
697+ // Only A should be exported, B is not observed so not exported (per spec)
698+ assert_eq ! (
699+ sum. data_points. len( ) ,
700+ 1 ,
701+ "Only A should be exported when B is not observed"
702+ ) ;
703+
704+ let dp_a = find_sum_datapoint_with_key_value ( & sum. data_points , "key" , "A" )
705+ . expect ( "datapoint for A expected" ) ;
706+ assert_eq ! ( dp_a. value, 50 , "A's delta should be 50 (150 - 100)" ) ;
707+
708+ test_context. reset_metrics ( ) ;
709+ }
710+
711+ // Collection 3: A=200, B=80 (B reappears)
712+ {
713+ * callback_state. lock ( ) . unwrap ( ) = ( 3 , 200 , Some ( 80 ) ) ;
714+ test_context. flush_metrics ( ) ;
715+
716+ let MetricData :: Sum ( sum) =
717+ test_context. get_aggregation :: < u64 > ( "my_observable_counter" , None )
718+ else {
719+ unreachable ! ( )
720+ } ;
721+
722+ assert_eq ! ( sum. data_points. len( ) , 2 ) ;
723+
724+ let dp_a = find_sum_datapoint_with_key_value ( & sum. data_points , "key" , "A" )
725+ . expect ( "datapoint for A expected" ) ;
726+ let dp_b = find_sum_datapoint_with_key_value ( & sum. data_points , "key" , "B" )
727+ . expect ( "datapoint for B expected" ) ;
728+
729+ assert_eq ! ( dp_a. value, 50 , "A's delta should be 50 (200 - 150)" ) ;
730+
731+ // B reappears after a gap. Current implementation uses "delta from zero" (Option 1).
732+ // This means B's delta = 80 - 0 = 80, not 80 - 50 = 30.
733+ // See: https://github.com/open-telemetry/opentelemetry-specification/issues/4861
734+ // TODO: Watch for spec clarification on this behavior.
735+ assert_eq ! (
736+ dp_b. value, 80 ,
737+ "B's delta should be 80 (fresh start after gap, not 30 from last known value)"
738+ ) ;
739+
740+ test_context. reset_metrics ( ) ;
741+ }
742+ }
743+
617744 #[ tokio:: test( flavor = "multi_thread" , worker_threads = 1 ) ]
618745 async fn empty_meter_name_retained ( ) {
619746 async fn meter_name_retained_helper (
0 commit comments