@@ -53,8 +53,9 @@ use tracing::{debug, error, field::debug, info, instrument, trace, warn};
53
53
use super :: {
54
54
algorithms:: { rfind_event_by_id, rfind_event_by_item_id} ,
55
55
controller:: {
56
- Aggregation , AggregationKind , ObservableItemsTransaction , ObservableItemsTransactionEntry ,
57
- PendingEdit , PendingEditKind , TimelineMetadata , TimelineStateTransaction ,
56
+ find_item_and_apply_aggregation, Aggregation , AggregationKind , ObservableItemsTransaction ,
57
+ ObservableItemsTransactionEntry , PendingEdit , PendingEditKind , TimelineMetadata ,
58
+ TimelineStateTransaction ,
58
59
} ,
59
60
date_dividers:: DateDividerAdjuster ,
60
61
event_item:: {
@@ -411,23 +412,13 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
411
412
412
413
AnyMessageLikeEventContent :: Sticker ( content) => {
413
414
if should_add {
414
- let mut content = TimelineItemContent :: Sticker ( Sticker {
415
- content,
416
- reactions : Default :: default ( ) ,
417
- } ) ;
418
-
419
- if let Some ( event_id) = self . ctx . flow . event_id ( ) {
420
- // Applying the cache to remote events only because local echoes
421
- // don't have an event ID that could be referenced by responses yet.
422
- if let Err ( err) = self . meta . aggregations . apply (
423
- & TimelineEventItemId :: EventId ( event_id. to_owned ( ) ) ,
424
- & mut content,
425
- ) {
426
- warn ! ( "discarding sticker aggregations: {err}" ) ;
427
- }
428
- }
429
-
430
- self . add_item ( content, None ) ;
415
+ self . add_item (
416
+ TimelineItemContent :: Sticker ( Sticker {
417
+ content,
418
+ reactions : Default :: default ( ) ,
419
+ } ) ,
420
+ None ,
421
+ ) ;
431
422
}
432
423
}
433
424
@@ -587,15 +578,10 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
587
578
588
579
let edit_json = edit_json. flatten ( ) ;
589
580
590
- let mut content =
591
- TimelineItemContent :: message ( msg, edit_content, self . items , Default :: default ( ) ) ;
592
- if let Err ( err) =
593
- self . meta . aggregations . apply ( & self . ctx . flow . timeline_item_id ( ) , & mut content)
594
- {
595
- warn ! ( "discarding message aggregations: {err}" ) ;
596
- }
597
-
598
- self . add_item ( content, edit_json) ;
581
+ self . add_item (
582
+ TimelineItemContent :: message ( msg, edit_content, self . items , Default :: default ( ) ) ,
583
+ edit_json,
584
+ ) ;
599
585
}
600
586
601
587
#[ instrument( skip_all, fields( replacement_event_id = ?replacement. event_id) ) ]
@@ -732,7 +718,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
732
718
/// [`crate::timeline::TimelineController::handle_local_echo`].
733
719
#[ instrument( skip_all, fields( relates_to_event_id = ?c. relates_to. event_id) ) ]
734
720
fn handle_reaction ( & mut self , c : ReactionEventContent ) {
735
- let reacted_to_event_id = & c. relates_to . event_id ;
721
+ let target = TimelineEventItemId :: EventId ( c. relates_to . event_id ) ;
736
722
737
723
// Add the aggregation to the manager.
738
724
let reaction_status = match & self . ctx . flow {
@@ -754,28 +740,10 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
754
740
reaction_status,
755
741
} ,
756
742
) ;
757
- self . meta
758
- . aggregations
759
- . add ( TimelineEventItemId :: EventId ( reacted_to_event_id. clone ( ) ) , aggregation. clone ( ) ) ;
760
743
761
- let Some ( ( idx, event_item) ) = rfind_event_by_id ( self . items , reacted_to_event_id) else {
762
- warn ! ( "couldn't find reaction's target {reacted_to_event_id:?}" ) ;
763
- return ;
764
- } ;
765
-
766
- let mut new_content = event_item. content ( ) . clone ( ) ;
767
- match aggregation. apply ( & mut new_content) {
768
- Ok ( true ) => {
769
- trace ! ( "added reaction" ) ;
770
- let new_item = event_item. with_content ( new_content) ;
771
- self . items
772
- . replace ( idx, TimelineItem :: new ( new_item, event_item. internal_id . to_owned ( ) ) ) ;
773
- self . result . items_updated += 1 ;
774
- }
775
- Ok ( false ) => { }
776
- Err ( err) => {
777
- warn ! ( "error when applying reaction aggregation: {err}" ) ;
778
- }
744
+ self . meta . aggregations . add ( target. clone ( ) , aggregation. clone ( ) ) ;
745
+ if find_item_and_apply_aggregation ( self . items , & target, aggregation) {
746
+ self . result . items_updated += 1 ;
779
747
}
780
748
}
781
749
@@ -872,21 +840,14 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
872
840
. unzip ( ) ;
873
841
874
842
let poll_state = PollState :: new ( c, edit_content, Default :: default ( ) ) ;
875
- let mut content = TimelineItemContent :: Poll ( poll_state) ;
876
- if let Err ( err) =
877
- self . meta . aggregations . apply ( & self . ctx . flow . timeline_item_id ( ) , & mut content)
878
- {
879
- warn ! ( "discarding poll aggregations: {err}" ) ;
880
- }
881
843
882
844
let edit_json = edit_json. flatten ( ) ;
883
845
884
- self . add_item ( content , edit_json) ;
846
+ self . add_item ( TimelineItemContent :: Poll ( poll_state ) , edit_json) ;
885
847
}
886
848
887
849
fn handle_poll_response ( & mut self , c : UnstablePollResponseEventContent ) {
888
- let start_event_id = c. relates_to . event_id ;
889
-
850
+ let target = TimelineEventItemId :: EventId ( c. relates_to . event_id ) ;
890
851
let aggregation = Aggregation :: new (
891
852
self . ctx . flow . timeline_item_id ( ) ,
892
853
AggregationKind :: PollResponse {
@@ -895,60 +856,21 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
895
856
answers : c. poll_response . answers ,
896
857
} ,
897
858
) ;
898
-
899
- self . meta
900
- . aggregations
901
- . add ( TimelineEventItemId :: EventId ( start_event_id. clone ( ) ) , aggregation. clone ( ) ) ;
902
-
903
- let Some ( ( item_pos, item) ) = rfind_event_by_id ( self . items , & start_event_id) else {
904
- return ;
905
- } ;
906
-
907
- let mut new_content = item. content ( ) . clone ( ) ;
908
- match aggregation. apply ( & mut new_content) {
909
- Ok ( true ) => {
910
- trace ! ( "adding poll response." ) ;
911
- self . items . replace (
912
- item_pos,
913
- TimelineItem :: new ( item. with_content ( new_content) , item. internal_id . clone ( ) ) ,
914
- ) ;
915
- self . result . items_updated += 1 ;
916
- }
917
- Ok ( false ) => { }
918
- Err ( err) => {
919
- warn ! ( "discarding poll response: {err}" ) ;
920
- }
859
+ self . meta . aggregations . add ( target. clone ( ) , aggregation. clone ( ) ) ;
860
+ if find_item_and_apply_aggregation ( self . items , & target, aggregation) {
861
+ self . result . items_updated += 1 ;
921
862
}
922
863
}
923
864
924
865
fn handle_poll_end ( & mut self , c : UnstablePollEndEventContent ) {
925
- let start_event_id = c. relates_to . event_id ;
926
-
866
+ let target = TimelineEventItemId :: EventId ( c. relates_to . event_id ) ;
927
867
let aggregation = Aggregation :: new (
928
868
self . ctx . flow . timeline_item_id ( ) ,
929
869
AggregationKind :: PollEnd { end_date : self . ctx . timestamp } ,
930
870
) ;
931
- self . meta
932
- . aggregations
933
- . add ( TimelineEventItemId :: EventId ( start_event_id. clone ( ) ) , aggregation. clone ( ) ) ;
934
-
935
- let Some ( ( item_pos, item) ) = rfind_event_by_id ( self . items , & start_event_id) else {
936
- return ;
937
- } ;
938
-
939
- let mut new_content = item. content ( ) . clone ( ) ;
940
- match aggregation. apply ( & mut new_content) {
941
- Ok ( true ) => {
942
- trace ! ( "Ending poll." ) ;
943
- let new_item = item. with_content ( new_content) ;
944
- self . items
945
- . replace ( item_pos, TimelineItem :: new ( new_item, item. internal_id . to_owned ( ) ) ) ;
946
- self . result . items_updated += 1 ;
947
- }
948
- Ok ( false ) => { }
949
- Err ( err) => {
950
- warn ! ( "discarding poll end: {err}" ) ;
951
- }
871
+ self . meta . aggregations . add ( target. clone ( ) , aggregation. clone ( ) ) ;
872
+ if find_item_and_apply_aggregation ( self . items , & target, aggregation) {
873
+ self . result . items_updated += 1 ;
952
874
}
953
875
}
954
876
@@ -1044,11 +966,18 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
1044
966
/// timeline item being added here.
1045
967
fn add_item (
1046
968
& mut self ,
1047
- content : TimelineItemContent ,
969
+ mut content : TimelineItemContent ,
1048
970
edit_json : Option < Raw < AnySyncTimelineEvent > > ,
1049
971
) {
1050
972
self . result . item_added = true ;
1051
973
974
+ // Apply any pending or stashed aggregations.
975
+ if let Err ( err) =
976
+ self . meta . aggregations . apply ( & self . ctx . flow . timeline_item_id ( ) , & mut content)
977
+ {
978
+ warn ! ( "discarding aggregations: {err}" ) ;
979
+ }
980
+
1052
981
let sender = self . ctx . sender . to_owned ( ) ;
1053
982
let sender_profile = TimelineDetails :: from_initial_value ( self . ctx . sender_profile . clone ( ) ) ;
1054
983
let timestamp = self . ctx . timestamp ;
0 commit comments