12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ //! An aggregation manager for the timeline.
16
+ //!
17
+ //! An aggregation is an event that relates to another event: for instance, a
18
+ //! reaction, a poll response, and so on and so forth.
19
+ //!
20
+ //! Because of the sync mechanisms and federation, it can happen that a related
21
+ //! event is received *before* receiving the event it relates to. Those events
22
+ //! must be accounted for, stashed somewhere, and reapplied later, if/when the
23
+ //! related-to event shows up.
24
+ //!
25
+ //! In addition to that, a room's event cache can also decide to move events
26
+ //! around, in its own internal representation (likely because it ran into some
27
+ //! duplicate events). When that happens, a timeline opened on the given room
28
+ //! will see a removal then re-insertion of the given event. If that event was
29
+ //! the target of aggregations, then those aggregations must be re-applied when
30
+ //! the given event is reinserted.
31
+ //!
32
+ //! To satisfy both requirements, the [`Aggregations`] "manager" object provided
33
+ //! by this module will take care of memoizing aggregations, for the entire
34
+ //! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some
35
+ //! caller). Aggregations are saved in memory, and have the same lifetime as
36
+ //! that of a timeline. This makes it possible to apply pending aggregations
37
+ //! to cater for the first use case, and to never lose any aggregations in the
38
+ //! second use case.
39
+
15
40
use std:: collections:: HashMap ;
16
41
17
42
use ruma:: { MilliSecondsSinceUnixEpoch , OwnedEventId , OwnedTransactionId , OwnedUserId } ;
@@ -21,32 +46,60 @@ use crate::timeline::{
21
46
PollState , ReactionInfo , ReactionStatus , TimelineEventItemId , TimelineItemContent ,
22
47
} ;
23
48
49
+ /// Which kind of aggregation (related event) is this?
24
50
#[ derive( Clone , Debug ) ]
25
51
pub ( crate ) enum AggregationKind {
52
+ /// This is a response to a poll.
26
53
PollResponse {
54
+ /// Sender of the poll's response.
27
55
sender : OwnedUserId ,
56
+ /// Timestamp at which the response has beens ent.
28
57
timestamp : MilliSecondsSinceUnixEpoch ,
58
+ /// All the answers to the poll sent by the sender.
29
59
answers : Vec < String > ,
30
60
} ,
31
61
62
+ /// This is the marker of the end of a poll.
32
63
PollEnd {
64
+ /// Timestamp at which the poll ends, i.e. all the responses with a
65
+ /// timestamp prior to this one should be taken into account
66
+ /// (and all the responses with a timestamp after this one
67
+ /// should be dropped).
33
68
end_date : MilliSecondsSinceUnixEpoch ,
34
69
} ,
35
70
71
+ /// This is a reaction to another event.
36
72
Reaction {
73
+ /// The reaction "key" displayed by the client, often an emoji.
37
74
key : String ,
75
+ /// Sender of the reaction.
38
76
sender : OwnedUserId ,
77
+ /// Timestamp at which the reaction has been sent.
39
78
timestamp : MilliSecondsSinceUnixEpoch ,
79
+ /// The send status of the reaction this is, with handles to abort it if
80
+ /// we can, etc.
40
81
reaction_status : ReactionStatus ,
41
82
} ,
42
83
}
43
84
85
+ /// An aggregation is an event related to another event (for instance a
86
+ /// reaction, a poll's response, etc.).
87
+ ///
88
+ /// It can be either a local or a remote echo.
44
89
#[ derive( Clone , Debug ) ]
45
90
pub ( crate ) struct Aggregation {
91
+ /// The kind of aggregation this represents.
46
92
pub kind : AggregationKind ,
93
+
94
+ /// The own timeline identifier for a reaction.
95
+ ///
96
+ /// It will be a transaction id when the aggregation is still a local echo,
97
+ /// and it will transition into an event id when the aggregation is a
98
+ /// remote echo (i.e. has been received in a sync response):
47
99
pub own_id : TimelineEventItemId ,
48
100
}
49
101
102
+ /// Get the poll state from a given [`TimelineItemContent`].
50
103
fn poll_state_from_item (
51
104
content : & mut TimelineItemContent ,
52
105
) -> Result < & mut PollState , AggregationError > {
@@ -60,15 +113,20 @@ fn poll_state_from_item(
60
113
}
61
114
62
115
impl Aggregation {
116
+ /// Create a new [`Aggregation`].
63
117
pub fn new ( own_id : TimelineEventItemId , kind : AggregationKind ) -> Self {
64
118
Self { kind, own_id }
65
119
}
66
120
67
121
/// Apply an aggregation in-place to a given [`TimelineItemContent`].
68
122
///
69
123
/// In case of success, returns a boolean indicating whether applying the
70
- /// aggregation caused a change in the content. In case of error,
71
- /// returns an error detailing why the aggregation couldn't be applied.
124
+ /// aggregation caused a change in the content. If the aggregation could be
125
+ /// applied, but it didn't cause any change in the passed
126
+ /// [`TimelineItemContent`], `Ok(false)` will be returned.
127
+ ///
128
+ /// In case of error, returns an error detailing why the aggregation
129
+ /// couldn't be applied.
72
130
pub fn apply ( & self , content : & mut TimelineItemContent ) -> Result < bool , AggregationError > {
73
131
match & self . kind {
74
132
AggregationKind :: PollResponse { sender, timestamp, answers } => {
@@ -122,9 +180,13 @@ impl Aggregation {
122
180
123
181
/// Undo an aggregation in-place to a given [`TimelineItemContent`].
124
182
///
125
- /// In case of success, returns a boolean indicating whether applying the
126
- /// aggregation caused a change in the content. In case of error,
127
- /// returns an error detailing why the aggregation couldn't be applied.
183
+ /// In case of success, returns a boolean indicating whether unapplying the
184
+ /// aggregation caused a change in the content. If the aggregation could be
185
+ /// unapplied, but it didn't cause any change in the passed
186
+ /// [`TimelineItemContent`], `Ok(false)` will be returned.
187
+ ///
188
+ /// In case of error, returns an error detailing why the aggregation
189
+ /// couldn't be applied.
128
190
pub fn unapply ( & self , content : & mut TimelineItemContent ) -> Result < bool , AggregationError > {
129
191
match & self . kind {
130
192
AggregationKind :: PollResponse { sender, timestamp, .. } => {
@@ -172,11 +234,14 @@ pub(crate) struct Aggregations {
172
234
}
173
235
174
236
impl Aggregations {
237
+ /// Clear all the known aggregations from all the mappings.
175
238
pub fn clear ( & mut self ) {
176
239
self . related_events . clear ( ) ;
177
240
self . inverted_map . clear ( ) ;
178
241
}
179
242
243
+ /// Add a given aggregation that relates to the [`TimelineItemContent`]
244
+ /// identified by the given [`TimelineEventItemId`].
180
245
pub fn add ( & mut self , related_to : TimelineEventItemId , aggregation : Aggregation ) {
181
246
self . inverted_map . insert ( aggregation. own_id . clone ( ) , related_to. clone ( ) ) ;
182
247
self . related_events . entry ( related_to) . or_default ( ) . push ( aggregation) ;
@@ -192,20 +257,35 @@ impl Aggregations {
192
257
let found = self . inverted_map . get ( aggregation_id) ?;
193
258
194
259
// Find and remove the aggregation in the other mapping.
195
- let aggregation = self . related_events . get_mut ( found) . and_then ( |aggregations| {
196
- aggregations
260
+ let aggregation = if let Some ( aggregations ) = self . related_events . get_mut ( found) {
261
+ let removed = aggregations
197
262
. iter ( )
198
263
. position ( |agg| agg. own_id == * aggregation_id)
199
- . map ( |idx| aggregations. remove ( idx) )
200
- } ) ;
264
+ . map ( |idx| aggregations. remove ( idx) ) ;
265
+
266
+ // If this was the last aggregation, remove the entry in the `related_events`
267
+ // mapping.
268
+ if aggregations. is_empty ( ) {
269
+ self . related_events . remove ( found) ;
270
+ }
271
+
272
+ removed
273
+ } else {
274
+ None
275
+ } ;
201
276
202
277
if aggregation. is_none ( ) {
203
- warn ! ( "unexpected missing aggregation {aggregation_id:?} ( was present in the inverted map, not in the actual map) " ) ;
278
+ warn ! ( "incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map. " ) ;
204
279
}
205
280
206
281
Some ( ( found, aggregation?) )
207
282
}
208
283
284
+ /// Apply all the aggregations to a [`TimelineItemContent`].
285
+ ///
286
+ /// Will return an error at the first aggregation that couldn't be applied;
287
+ /// see [`Aggregation::apply`] which explains under which conditions it can
288
+ /// happen.
209
289
pub fn apply (
210
290
& self ,
211
291
item_id : & TimelineEventItemId ,
@@ -237,13 +317,18 @@ impl Aggregations {
237
317
}
238
318
}
239
319
// Update the direct mapping of target -> aggregations.
240
- self . related_events . insert ( to, aggregations) ;
320
+ self . related_events . entry ( to) . or_default ( ) . extend ( aggregations) ;
241
321
}
242
322
}
243
323
244
324
/// Mark an aggregation event as being sent (i.e. it transitions from an
245
325
/// local transaction id to its remote event id counterpart), by
246
326
/// updating the internal mappings.
327
+ ///
328
+ /// When an aggregation has been marked as sent, it may need to be reapplied
329
+ /// to the corresponding [`TimelineItemContent`]; in this case, a
330
+ /// [`MarkAggregationSentResult::MarkedSent`] result with a set `update`
331
+ /// will be returned, and must be applied.
247
332
pub fn mark_aggregation_as_sent (
248
333
& mut self ,
249
334
txn_id : OwnedTransactionId ,
@@ -284,8 +369,14 @@ impl Aggregations {
284
369
}
285
370
}
286
371
372
+ /// The result of marking an aggregation as sent.
287
373
pub ( crate ) enum MarkAggregationSentResult {
374
+ /// The aggregation has been found, and marked as sent.
375
+ ///
376
+ /// Optionally, it can include an [`Aggregation`] `update` to the matching
377
+ /// [`TimelineItemContent`] item identified by the [`TimelineEventItemId`].
288
378
MarkedSent { update : Option < ( TimelineEventItemId , Aggregation ) > } ,
379
+ /// The aggregation was unknown to the aggregations manager, aka not found.
289
380
NotFound ,
290
381
}
291
382
0 commit comments