diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index 5f7a0484b85..16aa1592808 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -1040,6 +1040,7 @@ pub struct EventTimelineItem { impl From for EventTimelineItem { fn from(item: matrix_sdk_ui::timeline::EventTimelineItem) -> Self { let reactions = item + .content() .reactions() .iter() .map(|(k, v)| Reaction { diff --git a/crates/matrix-sdk-ui/CHANGELOG.md b/crates/matrix-sdk-ui/CHANGELOG.md index e66529b8e05..5178451c0cb 100644 --- a/crates/matrix-sdk-ui/CHANGELOG.md +++ b/crates/matrix-sdk-ui/CHANGELOG.md @@ -6,6 +6,18 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - ReleaseDate +### Bug Fixes + +### Features + +### Refactor + +- [**breaking**] Reactions on a given timeline item have been moved from + [`EventTimelineItem::reactions()`] to [`TimelineItemContent::reactions()`]; they're thus available + from an [`EventTimelineItem`] by calling `.content().reactions()`. They're also returned by + ownership (cloned) instead of by reference. + ([#4576](https://github.com/matrix-org/matrix-rust-sdk/pull/4576)) + ## [0.10.0] - 2025-02-04 ### Bug Fixes diff --git a/crates/matrix-sdk-ui/src/timeline/algorithms.rs b/crates/matrix-sdk-ui/src/timeline/algorithms.rs index edf3464bee5..0ef50e1eea9 100644 --- a/crates/matrix-sdk-ui/src/timeline/algorithms.rs +++ b/crates/matrix-sdk-ui/src/timeline/algorithms.rs @@ -39,7 +39,8 @@ impl EventTimelineItemWithId<'_> { /// Create a clone of the underlying [`TimelineItem`] with the given /// reactions. pub fn with_reactions(&self, reactions: ReactionsByKeyBySender) -> Arc { - let event_item = self.inner.with_reactions(reactions); + let content = self.inner.content().with_reactions(reactions); + let event_item = self.inner.with_content(content); TimelineItem::new(event_item, self.internal_id.clone()) } } diff --git a/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs new file mode 100644 index 00000000000..db93966518a --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/controller/aggregations.rs @@ -0,0 +1,460 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! An aggregation manager for the timeline. +//! +//! An aggregation is an event that relates to another event: for instance, a +//! reaction, a poll response, and so on and so forth. +//! +//! Because of the sync mechanisms and federation, it can happen that a related +//! event is received *before* receiving the event it relates to. Those events +//! must be accounted for, stashed somewhere, and reapplied later, if/when the +//! related-to event shows up. +//! +//! In addition to that, a room's event cache can also decide to move events +//! around, in its own internal representation (likely because it ran into some +//! duplicate events). When that happens, a timeline opened on the given room +//! will see a removal then re-insertion of the given event. If that event was +//! the target of aggregations, then those aggregations must be re-applied when +//! the given event is reinserted. +//! +//! To satisfy both requirements, the [`Aggregations`] "manager" object provided +//! by this module will take care of memoizing aggregations, for the entire +//! lifetime of the timeline (or until it's [`Aggregations::clear()`]'ed by some +//! caller). Aggregations are saved in memory, and have the same lifetime as +//! that of a timeline. This makes it possible to apply pending aggregations +//! to cater for the first use case, and to never lose any aggregations in the +//! second use case. + +use std::collections::HashMap; + +use ruma::{MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId}; +use tracing::{trace, warn}; + +use super::{rfind_event_by_item_id, ObservableItemsTransaction}; +use crate::timeline::{ + PollState, ReactionInfo, ReactionStatus, TimelineEventItemId, TimelineItem, TimelineItemContent, +}; + +/// Which kind of aggregation (related event) is this? +#[derive(Clone, Debug)] +pub(crate) enum AggregationKind { + /// This is a response to a poll. + PollResponse { + /// Sender of the poll's response. + sender: OwnedUserId, + /// Timestamp at which the response has beens ent. + timestamp: MilliSecondsSinceUnixEpoch, + /// All the answers to the poll sent by the sender. + answers: Vec, + }, + + /// This is the marker of the end of a poll. + PollEnd { + /// Timestamp at which the poll ends, i.e. all the responses with a + /// timestamp prior to this one should be taken into account + /// (and all the responses with a timestamp after this one + /// should be dropped). + end_date: MilliSecondsSinceUnixEpoch, + }, + + /// This is a reaction to another event. + Reaction { + /// The reaction "key" displayed by the client, often an emoji. + key: String, + /// Sender of the reaction. + sender: OwnedUserId, + /// Timestamp at which the reaction has been sent. + timestamp: MilliSecondsSinceUnixEpoch, + /// The send status of the reaction this is, with handles to abort it if + /// we can, etc. + reaction_status: ReactionStatus, + }, +} + +/// An aggregation is an event related to another event (for instance a +/// reaction, a poll's response, etc.). +/// +/// It can be either a local or a remote echo. +#[derive(Clone, Debug)] +pub(crate) struct Aggregation { + /// The kind of aggregation this represents. + pub kind: AggregationKind, + + /// The own timeline identifier for an aggregation. + /// + /// It will be a transaction id when the aggregation is still a local echo, + /// and it will transition into an event id when the aggregation is a + /// remote echo (i.e. has been received in a sync response): + pub own_id: TimelineEventItemId, +} + +/// Get the poll state from a given [`TimelineItemContent`]. +fn poll_state_from_item( + content: &mut TimelineItemContent, +) -> Result<&mut PollState, AggregationError> { + match content { + TimelineItemContent::Poll(poll_state) => Ok(poll_state), + _ => Err(AggregationError::InvalidType { + expected: "a poll".to_owned(), + actual: content.debug_string().to_owned(), + }), + } +} + +impl Aggregation { + /// Create a new [`Aggregation`]. + pub fn new(own_id: TimelineEventItemId, kind: AggregationKind) -> Self { + Self { kind, own_id } + } + + /// Apply an aggregation in-place to a given [`TimelineItemContent`]. + /// + /// In case of success, returns an enum indicating whether the applied + /// aggregation had an effect on the content; if it updated it, then the + /// caller has the responsibility to reflect that change. + /// + /// In case of error, returns an error detailing why the aggregation + /// couldn't be applied. + pub fn apply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult { + match &self.kind { + AggregationKind::PollResponse { sender, timestamp, answers } => { + let state = match poll_state_from_item(content) { + Ok(state) => state, + Err(err) => return ApplyAggregationResult::Error(err), + }; + state.add_response(sender.clone(), *timestamp, answers.clone()); + ApplyAggregationResult::UpdatedItem + } + + AggregationKind::PollEnd { end_date } => { + let poll_state = match poll_state_from_item(content) { + Ok(state) => state, + Err(err) => return ApplyAggregationResult::Error(err), + }; + if !poll_state.end(*end_date) { + return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded); + } + ApplyAggregationResult::UpdatedItem + } + + AggregationKind::Reaction { key, sender, timestamp, reaction_status } => { + let Some(reactions) = content.reactions_mut() else { + // These items don't hold reactions. + return ApplyAggregationResult::LeftItemIntact; + }; + + let previous_reaction = reactions.entry(key.clone()).or_default().insert( + sender.clone(), + ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() }, + ); + + let is_same = previous_reaction.is_some_and(|prev| { + prev.timestamp == *timestamp + && matches!( + (prev.status, reaction_status), + (ReactionStatus::LocalToLocal(_), ReactionStatus::LocalToLocal(_)) + | ( + ReactionStatus::LocalToRemote(_), + ReactionStatus::LocalToRemote(_), + ) + | ( + ReactionStatus::RemoteToRemote(_), + ReactionStatus::RemoteToRemote(_), + ) + ) + }); + + if is_same { + ApplyAggregationResult::LeftItemIntact + } else { + ApplyAggregationResult::UpdatedItem + } + } + } + } + + /// Undo an aggregation in-place to a given [`TimelineItemContent`]. + /// + /// In case of success, returns an enum indicating whether unapplying the + /// aggregation had an effect on the content; if it updated it, then the + /// caller has the responsibility to reflect that change. + /// + /// In case of error, returns an error detailing why the aggregation + /// couldn't be unapplied. + pub fn unapply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult { + match &self.kind { + AggregationKind::PollResponse { sender, timestamp, .. } => { + let state = match poll_state_from_item(content) { + Ok(state) => state, + Err(err) => return ApplyAggregationResult::Error(err), + }; + state.remove_response(sender, *timestamp); + ApplyAggregationResult::UpdatedItem + } + + AggregationKind::PollEnd { .. } => { + // Assume we can't undo a poll end event at the moment. + ApplyAggregationResult::Error(AggregationError::CantUndoPollEnd) + } + + AggregationKind::Reaction { key, sender, .. } => { + let Some(reactions) = content.reactions_mut() else { + // An item that doesn't hold any reactions. + return ApplyAggregationResult::LeftItemIntact; + }; + + let by_user = reactions.get_mut(key); + let previous_entry = if let Some(by_user) = by_user { + let prev = by_user.swap_remove(sender); + // If this was the last reaction, remove the entire map for this key. + if by_user.is_empty() { + reactions.swap_remove(key); + } + prev + } else { + None + }; + + if previous_entry.is_some() { + ApplyAggregationResult::UpdatedItem + } else { + ApplyAggregationResult::LeftItemIntact + } + } + } + } +} + +/// Manager for all known existing aggregations to all events in the timeline. +#[derive(Clone, Debug, Default)] +pub(crate) struct Aggregations { + /// Mapping of a target event to its list of aggregations. + related_events: HashMap>, + + /// Mapping of a related event identifier to its target. + inverted_map: HashMap, +} + +impl Aggregations { + /// Clear all the known aggregations from all the mappings. + pub fn clear(&mut self) { + self.related_events.clear(); + self.inverted_map.clear(); + } + + /// Add a given aggregation that relates to the [`TimelineItemContent`] + /// identified by the given [`TimelineEventItemId`]. + pub fn add(&mut self, related_to: TimelineEventItemId, aggregation: Aggregation) { + self.inverted_map.insert(aggregation.own_id.clone(), related_to.clone()); + self.related_events.entry(related_to).or_default().push(aggregation); + } + + /// Is the given id one for a known aggregation to another event? + /// + /// If so, returns the target event identifier as well as the aggregation. + /// The aggregation must be unapplied on the corresponding timeline + /// item. + #[must_use] + pub fn try_remove_aggregation( + &mut self, + aggregation_id: &TimelineEventItemId, + ) -> Option<(&TimelineEventItemId, Aggregation)> { + let found = self.inverted_map.get(aggregation_id)?; + + // Find and remove the aggregation in the other mapping. + let aggregation = if let Some(aggregations) = self.related_events.get_mut(found) { + let removed = aggregations + .iter() + .position(|agg| agg.own_id == *aggregation_id) + .map(|idx| aggregations.remove(idx)); + + // If this was the last aggregation, remove the entry in the `related_events` + // mapping. + if aggregations.is_empty() { + self.related_events.remove(found); + } + + removed + } else { + None + }; + + if aggregation.is_none() { + warn!("incorrect internal state: {aggregation_id:?} was present in the inverted map, not in related-to map."); + } + + Some((found, aggregation?)) + } + + /// Apply all the aggregations to a [`TimelineItemContent`]. + /// + /// Will return an error at the first aggregation that couldn't be applied; + /// see [`Aggregation::apply`] which explains under which conditions it can + /// happen. + pub fn apply( + &self, + item_id: &TimelineEventItemId, + content: &mut TimelineItemContent, + ) -> Result<(), AggregationError> { + let Some(aggregations) = self.related_events.get(item_id) else { + return Ok(()); + }; + for a in aggregations { + if let ApplyAggregationResult::Error(err) = a.apply(content) { + return Err(err); + } + } + Ok(()) + } + + /// Mark a target event as being sent (i.e. it transitions from an local + /// transaction id to its remote event id counterpart), by updating the + /// internal mappings. + pub fn mark_target_as_sent(&mut self, txn_id: OwnedTransactionId, event_id: OwnedEventId) { + let from = TimelineEventItemId::TransactionId(txn_id); + let to = TimelineEventItemId::EventId(event_id); + + // Update the aggregations in the `related_events` field. + if let Some(aggregations) = self.related_events.remove(&from) { + // Update the inverted mappings (from aggregation's id, to the new target id). + for a in &aggregations { + if let Some(prev_target) = self.inverted_map.remove(&a.own_id) { + debug_assert_eq!(prev_target, from); + self.inverted_map.insert(a.own_id.clone(), to.clone()); + } + } + // Update the direct mapping of target -> aggregations. + self.related_events.entry(to).or_default().extend(aggregations); + } + } + + /// Mark an aggregation event as being sent (i.e. it transitions from an + /// local transaction id to its remote event id counterpart), by + /// updating the internal mappings. + /// + /// When an aggregation has been marked as sent, it may need to be reapplied + /// to the corresponding [`TimelineItemContent`]; in this case, a + /// [`MarkAggregationSentResult::MarkedSent`] result with a set `update` + /// will be returned, and must be applied. + pub fn mark_aggregation_as_sent( + &mut self, + txn_id: OwnedTransactionId, + event_id: OwnedEventId, + ) -> MarkAggregationSentResult { + let from = TimelineEventItemId::TransactionId(txn_id); + let to = TimelineEventItemId::EventId(event_id.clone()); + + let Some(target) = self.inverted_map.remove(&from) else { + return MarkAggregationSentResult::NotFound; + }; + + let mut target_and_new_aggregation = MarkAggregationSentResult::MarkedSent { update: None }; + + if let Some(aggregations) = self.related_events.get_mut(&target) { + if let Some(found) = aggregations.iter_mut().find(|agg| agg.own_id == from) { + found.own_id = to.clone(); + + match &mut found.kind { + AggregationKind::PollResponse { .. } | AggregationKind::PollEnd { .. } => { + // Nothing particular to do. + } + AggregationKind::Reaction { reaction_status, .. } => { + // Mark the reaction as becoming remote, and signal that update to the + // caller. + *reaction_status = ReactionStatus::RemoteToRemote(event_id); + target_and_new_aggregation = MarkAggregationSentResult::MarkedSent { + update: Some((target.clone(), found.clone())), + }; + } + } + } + } + + self.inverted_map.insert(to, target); + + target_and_new_aggregation + } +} + +/// Find an item identified by the target identifier, and apply the aggregation +/// onto it. +/// +/// Returns whether the aggregation has been applied or not (so as to increment +/// a number of updated result, for instance). +pub(crate) fn find_item_and_apply_aggregation( + items: &mut ObservableItemsTransaction<'_>, + target: &TimelineEventItemId, + aggregation: Aggregation, +) -> bool { + let Some((idx, event_item)) = rfind_event_by_item_id(items, target) else { + warn!("couldn't find aggregation's target {target:?}"); + return false; + }; + + let mut new_content = event_item.content().clone(); + + match aggregation.apply(&mut new_content) { + ApplyAggregationResult::UpdatedItem => { + trace!("applied aggregation"); + let new_item = event_item.with_content(new_content); + items.replace(idx, TimelineItem::new(new_item, event_item.internal_id.to_owned())); + true + } + ApplyAggregationResult::LeftItemIntact => { + trace!("applying the aggregation had no effect"); + false + } + ApplyAggregationResult::Error(err) => { + warn!("error when applying aggregation: {err}"); + false + } + } +} + +/// The result of marking an aggregation as sent. +pub(crate) enum MarkAggregationSentResult { + /// The aggregation has been found, and marked as sent. + /// + /// Optionally, it can include an [`Aggregation`] `update` to the matching + /// [`TimelineItemContent`] item identified by the [`TimelineEventItemId`]. + MarkedSent { update: Option<(TimelineEventItemId, Aggregation)> }, + /// The aggregation was unknown to the aggregations manager, aka not found. + NotFound, +} + +/// The result of applying (or unapplying) an aggregation onto a timeline item. +pub(crate) enum ApplyAggregationResult { + /// The item has been updated after applying the aggregation. + UpdatedItem, + + /// The item hasn't been modified after applying the aggregation, because it + /// was likely already applied prior to this. + LeftItemIntact, + + /// An error happened while applying the aggregation. + Error(AggregationError), +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum AggregationError { + #[error("trying to end a poll twice")] + PollAlreadyEnded, + + #[error("a poll end can't be unapplied")] + CantUndoPollEnd, + + #[error("trying to apply an aggregation of one type to an invalid target: expected {expected}, actual {actual}")] + InvalidType { expected: String, actual: String }, +} diff --git a/crates/matrix-sdk-ui/src/timeline/controller/metadata.rs b/crates/matrix-sdk-ui/src/timeline/controller/metadata.rs index b02c68e67df..c60b8876235 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/metadata.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/metadata.rs @@ -20,12 +20,11 @@ use tracing::trace; use super::{ super::{ - reactions::Reactions, rfind_event_by_id, subscriber::skip::SkipCount, TimelineItem, - TimelineItemKind, TimelineUniqueId, + rfind_event_by_id, subscriber::skip::SkipCount, TimelineItem, TimelineItemKind, + TimelineUniqueId, }, read_receipts::ReadReceipts, - state::PendingPollEvents, - AllRemoteEvents, ObservableItemsTransaction, PendingEdit, + Aggregations, AllRemoteEvents, ObservableItemsTransaction, PendingEdit, }; use crate::unable_to_decrypt_hook::UtdHookManager; @@ -71,12 +70,8 @@ pub(in crate::timeline) struct TimelineMetadata { /// the device has terabytes of RAM. next_internal_id: u64, - /// State helping matching reactions to their associated events, and - /// stashing pending reactions. - pub reactions: Reactions, - - /// Associated poll events received before their original poll start event. - pub pending_poll_events: PendingPollEvents, + /// Aggregation metadata and pending aggregations. + pub aggregations: Aggregations, /// Edit events received before the related event they're editing. pub pending_edits: RingBuffer, @@ -115,8 +110,7 @@ impl TimelineMetadata { subscriber_skip_count: SkipCount::new(), own_user_id, next_internal_id: Default::default(), - reactions: Default::default(), - pending_poll_events: Default::default(), + aggregations: Default::default(), pending_edits: RingBuffer::new(MAX_NUM_STASHED_PENDING_EDITS), fully_read_event: Default::default(), // It doesn't make sense to set this to false until we fill the `fully_read_event` @@ -133,8 +127,7 @@ impl TimelineMetadata { pub(super) fn clear(&mut self) { // Note: we don't clear the next internal id to avoid bad cases of stale unique // ids across timeline clears. - self.reactions.clear(); - self.pending_poll_events.clear(); + self.aggregations.clear(); self.pending_edits.clear(); self.fully_read_event = None; // We forgot about the fully read marker right above, so wait for a new one diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index d0cf8d66ecb..f98ff0535cf 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -71,8 +71,8 @@ use super::{ subscriber::TimelineSubscriber, traits::{Decryptor, RoomDataProvider}, DateDividerMode, Error, EventSendState, EventTimelineItem, InReplyToDetails, Message, - PaginationError, Profile, ReactionInfo, RepliedToEvent, TimelineDetails, TimelineEventItemId, - TimelineFocus, TimelineItem, TimelineItemContent, TimelineItemKind, + PaginationError, Profile, RepliedToEvent, TimelineDetails, TimelineEventItemId, TimelineFocus, + TimelineItem, TimelineItemContent, TimelineItemKind, }; use crate::{ timeline::{ @@ -80,18 +80,20 @@ use crate::{ date_dividers::DateDividerAdjuster, event_item::EventTimelineItemKind, pinned_events_loader::{PinnedEventsLoader, PinnedEventsLoaderError}, - reactions::FullReactionKey, TimelineEventFilterFn, }, unable_to_decrypt_hook::UtdHookManager, }; +mod aggregations; mod metadata; mod observable_items; mod read_receipts; mod state; mod state_transaction; +pub(super) use aggregations::*; + /// Data associated to the current timeline focus. #[derive(Debug)] enum TimelineFocusData { @@ -552,6 +554,7 @@ impl TimelineController

{ let user_id = self.room_data_provider.own_user_id(); let prev_status = item + .content() .reactions() .get(key) .and_then(|group| group.get(user_id)) @@ -631,7 +634,7 @@ impl TimelineController

{ return Ok(false); }; - let mut reactions = item.reactions().clone(); + let mut reactions = item.content().reactions().clone(); let reaction_info = reactions.remove_reaction(user_id, key); if reaction_info.is_some() { @@ -654,7 +657,7 @@ impl TimelineController

{ rfind_event_by_id(&state.items, &annotated_event_id) { // Re-add the reaction to the mapping. - let mut reactions = item.reactions().clone(); + let mut reactions = item.content().reactions(); reactions .entry(key.to_owned()) .or_default() @@ -845,31 +848,49 @@ impl TimelineController

{ }); let Some((idx, item)) = result else { - // Event isn't found as a proper item. Try to find it as a reaction? - if let Some((event_id, reaction_key)) = new_event_id.zip( - txn.meta.reactions.map.get(&TimelineEventItemId::TransactionId(txn_id.to_owned())), - ) { - match &reaction_key.item { - TimelineEventItemId::TransactionId(_) => { - error!("unexpected remote reaction to local echo") - } - TimelineEventItemId::EventId(reacted_to_event_id) => { - if let Some((item_pos, event_item)) = - rfind_event_by_id(&txn.items, reacted_to_event_id) - { - let mut reactions = event_item.reactions().clone(); - if let Some(entry) = reactions - .get_mut(&reaction_key.key) - .and_then(|by_user| by_user.get_mut(&reaction_key.sender)) + // Event wasn't found as a standalone item. + // + // If it was just sent, try to find if it matches a corresponding aggregation, + // and mark it as sent in that case. + if let Some(new_event_id) = new_event_id { + match txn + .meta + .aggregations + .mark_aggregation_as_sent(txn_id.to_owned(), new_event_id.to_owned()) + { + MarkAggregationSentResult::MarkedSent { update } => { + trace!("marked aggregation as sent"); + + if let Some((target, aggregation)) = update { + if let Some((item_pos, item)) = + rfind_event_by_item_id(&txn.items, &target) { - trace!("updated reaction status to sent"); - entry.status = ReactionStatus::RemoteToRemote(event_id.to_owned()); - txn.items.replace(item_pos, event_item.with_reactions(reactions)); - txn.commit(); - return; + let mut content = item.content().clone(); + match aggregation.apply(&mut content) { + ApplyAggregationResult::UpdatedItem => { + trace!("reapplied aggregation in the event"); + let internal_id = item.internal_id.to_owned(); + let new_item = item.with_content(content); + txn.items.replace( + item_pos, + TimelineItem::new(new_item, internal_id), + ); + txn.commit(); + } + ApplyAggregationResult::LeftItemIntact => {} + ApplyAggregationResult::Error(err) => { + warn!("when reapplying aggregation just marked as sent: {err}"); + } + } } } + + // Early return: we've found the event to mark as sent, it was an + // aggregation. + return; } + + MarkAggregationSentResult::NotFound => {} } } @@ -878,7 +899,7 @@ impl TimelineController

{ }; let Some(local_item) = item.as_local() else { - warn!("We looked for a local item, but it transitioned to remote??"); + warn!("We looked for a local item, but it transitioned to remote."); return; }; @@ -888,25 +909,10 @@ impl TimelineController

{ error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent"); } - // If the event had local reactions, upgrade the mapping from reaction to - // events, to indicate that the event is now remote. + // If the event has just been marked as sent, update the aggregations mapping to + // take that into account. if let Some(new_event_id) = new_event_id { - let reactions = item.reactions(); - for (_key, by_user) in reactions.iter() { - for (_user_id, info) in by_user.iter() { - if let ReactionStatus::LocalToLocal(Some(reaction_handle)) = &info.status { - let reaction_txn_id = reaction_handle.transaction_id().to_owned(); - if let Some(found) = txn - .meta - .reactions - .map - .get_mut(&TimelineEventItemId::TransactionId(reaction_txn_id)) - { - found.item = TimelineEventItemId::EventId(new_event_id.to_owned()); - } - } - } - } + txn.meta.aggregations.mark_target_as_sent(txn_id.to_owned(), new_event_id.to_owned()); } let new_item = item.with_inner_kind(local_item.with_send_state(send_state)); @@ -934,41 +940,42 @@ impl TimelineController

{ txn.commit(); - debug!("Discarded local echo"); + debug!("discarded local echo"); return true; } - // Look if this was a local reaction echo. - if let Some(full_key) = - state.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.to_owned())) - { - let item = match &full_key.item { - TimelineEventItemId::TransactionId(txn_id) => { - rfind_event_item(&state.items, |item| item.transaction_id() == Some(txn_id)) - } - TimelineEventItemId::EventId(event_id) => rfind_event_by_id(&state.items, event_id), - }; + // Avoid multiple mutable and immutable borrows of the lock guard by explicitly + // dereferencing it once. + let state = &mut *state; - let Some((idx, item)) = item else { - warn!("missing reacted-to item for a reaction"); + // Look if this was a local aggregation. + if let Some((target, aggregation)) = state + .meta + .aggregations + .try_remove_aggregation(&TimelineEventItemId::TransactionId(txn_id.to_owned())) + { + let Some((item_pos, item)) = rfind_event_by_item_id(&state.items, target) else { + warn!("missing target item for a local aggregation"); return false; }; - let mut reactions = item.reactions().clone(); - if reactions.remove_reaction(&full_key.sender, &full_key.key).is_some() { - let updated_item = item.with_reactions(reactions); - state.items.replace(idx, updated_item); - } else { - warn!( - "missing reaction {} for sender {} on timeline item", - full_key.key, full_key.sender - ); + let mut content = item.content().clone(); + match aggregation.unapply(&mut content) { + ApplyAggregationResult::UpdatedItem => { + trace!("removed local reaction to local echo"); + let internal_id = item.internal_id.clone(); + let new_item = item.with_content(content); + state.items.replace(item_pos, TimelineItem::new(new_item, internal_id)); + } + ApplyAggregationResult::LeftItemIntact => {} + ApplyAggregationResult::Error(err) => { + warn!("when undoing local aggregation: {err}"); + } } return true; } - debug!("Can't find local echo to discard"); false } @@ -1007,10 +1014,14 @@ impl TimelineController

{ }; // Replace the local-related state (kind) and the content state. + let prev_reactions = prev_item.content().reactions(); let new_item = TimelineItem::new( - prev_item - .with_kind(ti_kind) - .with_content(TimelineItemContent::message(content, None, &txn.items)), + prev_item.with_kind(ti_kind).with_content(TimelineItemContent::message( + content, + None, + &txn.items, + prev_reactions, + )), prev_item.internal_id.to_owned(), ); @@ -1346,39 +1357,26 @@ impl TimelineController

{ applies_to: OwnedTransactionId, ) { let mut state = self.state.write().await; + let mut tr = state.transaction(); - let Some((item_pos, item)) = - rfind_event_item(&state.items, |item| item.transaction_id() == Some(&applies_to)) - else { - warn!("Local item not found anymore."); - return; - }; - - let user_id = self.room_data_provider.own_user_id(); + let target = TimelineEventItemId::TransactionId(applies_to); let reaction_txn_id = send_handle.transaction_id().to_owned(); - let reaction_info = ReactionInfo { - timestamp: MilliSecondsSinceUnixEpoch::now(), - status: ReactionStatus::LocalToLocal(Some(send_handle)), - }; - - let mut reactions = item.reactions().clone(); - let by_user = reactions.entry(reaction_key.clone()).or_default(); - by_user.insert(user_id.to_owned(), reaction_info); - - trace!("Adding local reaction to local echo"); - let new_item = item.with_reactions(reactions); - state.items.replace(item_pos, new_item); - - // Add it to the reaction map, so we can discard it later if needs be. - state.meta.reactions.map.insert( + let reaction_status = ReactionStatus::LocalToLocal(Some(send_handle)); + let aggregation = Aggregation::new( TimelineEventItemId::TransactionId(reaction_txn_id), - FullReactionKey { - item: TimelineEventItemId::TransactionId(applies_to), - key: reaction_key, - sender: user_id.to_owned(), + AggregationKind::Reaction { + key: reaction_key.clone(), + sender: self.room_data_provider.own_user_id().to_owned(), + timestamp: MilliSecondsSinceUnixEpoch::now(), + reaction_status, }, ); + + tr.meta.aggregations.add(target.clone(), aggregation.clone()); + find_item_and_apply_aggregation(&mut tr.items, &target, aggregation); + + tr.commit(); } /// Handle a single room send queue update. diff --git a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs index 5d1094f4c31..2d3125452de 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/observable_items.rs @@ -404,6 +404,7 @@ mod observable_items_tests { thread_root: None, edited: false, mentions: None, + reactions: Default::default(), }), EventTimelineItemKind::Remote(RemoteEventTimelineItem { event_id: event_id.parse().unwrap(), @@ -416,7 +417,6 @@ mod observable_items_tests { latest_edit_json: None, origin: RemoteEventOrigin::Sync, }), - Default::default(), false, ), TimelineUniqueId(format!("__id_{event_id}")), diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index e00002ab4b0..66a9ea8f221 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{collections::HashMap, future::Future, sync::Arc}; +use std::{future::Future, sync::Arc}; use eyeball_im::VectorDiff; use matrix_sdk::{deserialized_responses::TimelineEvent, send_queue::SendHandle}; @@ -20,12 +20,8 @@ use matrix_sdk::{deserialized_responses::TimelineEvent, send_queue::SendHandle}; use ruma::events::receipt::ReceiptEventContent; use ruma::{ events::{ - poll::{ - unstable_response::UnstablePollResponseEventContent, - unstable_start::NewUnstablePollStartEventContentWithoutRelation, - }, - relation::Replacement, - room::message::RoomMessageEventContentWithoutRelation, + poll::unstable_start::NewUnstablePollStartEventContentWithoutRelation, + relation::Replacement, room::message::RoomMessageEventContentWithoutRelation, AnySyncEphemeralRoomEvent, AnySyncTimelineEvent, }, serde::Raw, @@ -41,7 +37,7 @@ use super::{ Flow, TimelineEventContext, TimelineEventHandler, TimelineEventKind, TimelineItemPosition, }, - event_item::{PollState, RemoteEventOrigin, ResponseData}, + event_item::RemoteEventOrigin, traits::RoomDataProvider, Profile, TimelineItem, }, @@ -289,58 +285,6 @@ impl TimelineState { } } -/// Cache holding poll response and end events handled before their poll start -/// event has been handled. -#[derive(Clone, Debug, Default)] -pub(in crate::timeline) struct PendingPollEvents { - /// Responses to a poll (identified by the poll's start event id). - responses: HashMap>, - - /// Mapping of a poll (identified by its start event's id) to its end date. - end_dates: HashMap, -} - -impl PendingPollEvents { - pub(crate) fn add_response( - &mut self, - start_event_id: &EventId, - sender: &UserId, - timestamp: MilliSecondsSinceUnixEpoch, - content: &UnstablePollResponseEventContent, - ) { - self.responses.entry(start_event_id.to_owned()).or_default().push(ResponseData { - sender: sender.to_owned(), - timestamp, - answers: content.poll_response.answers.clone(), - }); - } - - pub(crate) fn clear(&mut self) { - self.end_dates.clear(); - self.responses.clear(); - } - - /// Mark a poll as finished by inserting its poll date. - pub(crate) fn mark_as_ended( - &mut self, - start_event_id: &EventId, - timestamp: MilliSecondsSinceUnixEpoch, - ) { - self.end_dates.insert(start_event_id.to_owned(), timestamp); - } - - /// Dumps all response and end events present in the cache that belong to - /// the given start_event_id into the given poll_state. - pub(crate) fn apply_pending(&mut self, start_event_id: &EventId, poll_state: &mut PollState) { - if let Some(pending_responses) = self.responses.remove(start_event_id) { - poll_state.response_data.extend(pending_responses); - } - if let Some(pending_end) = self.end_dates.remove(start_event_id) { - poll_state.end_event_timestamp = Some(pending_end); - } - } -} - #[derive(Clone)] pub(in crate::timeline) enum PendingEditKind { RoomMessage(Replacement), diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs index fff591bcf18..c7a56d45cfe 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs @@ -433,7 +433,7 @@ impl<'a> TimelineStateTransaction<'a> { if let Some(event_meta) = self.items.all_remote_events().get(event_index) { // Fetch the `timeline_item_index` associated to the remote event. if let Some(timeline_item_index) = event_meta.timeline_item_index { - let _removed_timeline_item = self.items.remove(timeline_item_index); + let _ = self.items.remove(timeline_item_index); } // Now we can remove the remote event. diff --git a/crates/matrix-sdk-ui/src/timeline/date_dividers.rs b/crates/matrix-sdk-ui/src/timeline/date_dividers.rs index 55ac269433c..cbbed7665ef 100644 --- a/crates/matrix-sdk-ui/src/timeline/date_dividers.rs +++ b/crates/matrix-sdk-ui/src/timeline/date_dividers.rs @@ -667,7 +667,6 @@ mod tests { timestamp, TimelineItemContent::RedactedMessage, event_kind, - Default::default(), false, ) } diff --git a/crates/matrix-sdk-ui/src/timeline/event_handler.rs b/crates/matrix-sdk-ui/src/timeline/event_handler.rs index e291eab07a9..b9e960a3084 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_handler.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_handler.rs @@ -51,8 +51,9 @@ use ruma::{ use tracing::{debug, error, field::debug, info, instrument, trace, warn}; use super::{ - algorithms::rfind_event_by_id, + algorithms::{rfind_event_by_id, rfind_event_by_item_id}, controller::{ + find_item_and_apply_aggregation, Aggregation, AggregationKind, ApplyAggregationResult, ObservableItemsTransaction, ObservableItemsTransactionEntry, PendingEdit, PendingEditKind, TimelineMetadata, TimelineStateTransaction, }, @@ -60,13 +61,12 @@ use super::{ event_item::{ extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, AnyOtherFullStateEventContent, EventSendState, EventTimelineItemKind, - LocalEventTimelineItem, PollState, Profile, ReactionInfo, ReactionStatus, - ReactionsByKeyBySender, RemoteEventOrigin, RemoteEventTimelineItem, TimelineEventItemId, + LocalEventTimelineItem, PollState, Profile, RemoteEventOrigin, RemoteEventTimelineItem, + TimelineEventItemId, }, - reactions::{FullReactionKey, PendingReaction}, traits::RoomDataProvider, - EventTimelineItem, InReplyToDetails, OtherState, RepliedToEvent, Sticker, TimelineDetails, - TimelineItem, TimelineItemContent, + EventTimelineItem, InReplyToDetails, OtherState, ReactionStatus, RepliedToEvent, Sticker, + TimelineDetails, TimelineItem, TimelineItemContent, }; use crate::events::SyncTimelineEventWithoutContent; @@ -104,6 +104,14 @@ impl Flow { as_variant!(self, Flow::Remote { event_id, .. } => event_id) } + /// Returns the [`TimelineEventItemId`] associated to this future item. + pub(crate) fn timeline_item_id(&self) -> TimelineEventItemId { + match self { + Flow::Remote { event_id, .. } => TimelineEventItemId::EventId(event_id.clone()), + Flow::Local { txn_id, .. } => TimelineEventItemId::TransactionId(txn_id.clone()), + } + } + /// If the flow is remote, returns the associated full raw event. pub(crate) fn raw_event(&self) -> Option<&Raw> { as_variant!(self, Flow::Remote { raw_event, .. } => raw_event) @@ -404,7 +412,13 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { AnyMessageLikeEventContent::Sticker(content) => { if should_add { - self.add_item(TimelineItemContent::Sticker(Sticker { content }), None); + self.add_item( + TimelineItemContent::Sticker(Sticker { + content, + reactions: Default::default(), + }), + None, + ); } } @@ -564,7 +578,10 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { let edit_json = edit_json.flatten(); - self.add_item(TimelineItemContent::message(msg, edit_content, self.items), edit_json); + self.add_item( + TimelineItemContent::message(msg, edit_content, self.items, Default::default()), + edit_json, + ); } #[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))] @@ -695,83 +712,39 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { Some(new_item) } - // Redacted reaction events are no-ops so don't need to be handled + /// Apply a reaction to a *remote* event. + /// + /// Reactions to local events are applied in + /// [`crate::timeline::TimelineController::handle_local_echo`]. #[instrument(skip_all, fields(relates_to_event_id = ?c.relates_to.event_id))] fn handle_reaction(&mut self, c: ReactionEventContent) { - let reacted_to_event_id = &c.relates_to.event_id; + let target = TimelineEventItemId::EventId(c.relates_to.event_id); - let (reaction_id, send_handle, old_txn_id) = match &self.ctx.flow { - Flow::Local { txn_id, send_handle, .. } => { - (TimelineEventItemId::TransactionId(txn_id.clone()), send_handle.clone(), None) + // Add the aggregation to the manager. + let reaction_status = match &self.ctx.flow { + Flow::Local { send_handle, .. } => { + // This is a local echo for a reaction to a remote event. + ReactionStatus::LocalToRemote(send_handle.clone()) } - Flow::Remote { event_id, txn_id, .. } => { - (TimelineEventItemId::EventId(event_id.clone()), None, txn_id.as_ref()) + Flow::Remote { event_id, .. } => { + // This is the remote echo for a reaction to a remote event. + ReactionStatus::RemoteToRemote(event_id.clone()) } }; - - if let Some((idx, event_item)) = rfind_event_by_id(self.items, reacted_to_event_id) { - // Ignore reactions on redacted events. - if let TimelineItemContent::RedactedMessage = event_item.content() { - debug!("Ignoring reaction on redacted event"); - return; - } - - trace!("Added reaction"); - - // Add the reaction to the event item's bundled reactions. - let mut reactions = event_item.reactions.clone(); - - reactions.entry(c.relates_to.key.clone()).or_default().insert( - self.ctx.sender.clone(), - ReactionInfo { - timestamp: self.ctx.timestamp, - status: match &reaction_id { - TimelineEventItemId::TransactionId(_txn_id) => { - ReactionStatus::LocalToRemote(send_handle) - } - TimelineEventItemId::EventId(event_id) => { - ReactionStatus::RemoteToRemote(event_id.clone()) - } - }, - }, - ); - - self.items.replace(idx, event_item.with_reactions(reactions)); - - self.result.items_updated += 1; - } else { - trace!("Timeline item not found, adding reaction to the pending list"); - - let TimelineEventItemId::EventId(reaction_event_id) = reaction_id.clone() else { - error!("Adding local reaction echo to event absent from the timeline"); - return; - }; - - self.meta.reactions.pending.entry(reacted_to_event_id.to_owned()).or_default().insert( - reaction_event_id, - PendingReaction { - key: c.relates_to.key.clone(), - sender_id: self.ctx.sender.clone(), - timestamp: self.ctx.timestamp, - }, - ); - } - - if let Some(txn_id) = old_txn_id { - // Try to remove a local echo of that reaction. It might be missing if the - // reaction wasn't sent by this device, or was sent in a previous - // session. - self.meta.reactions.map.remove(&TimelineEventItemId::TransactionId(txn_id.clone())); - } - - self.meta.reactions.map.insert( - reaction_id, - FullReactionKey { - item: TimelineEventItemId::EventId(c.relates_to.event_id), - sender: self.ctx.sender.clone(), + let aggregation = Aggregation::new( + self.ctx.flow.timeline_item_id(), + AggregationKind::Reaction { key: c.relates_to.key, + sender: self.ctx.sender.clone(), + timestamp: self.ctx.timestamp, + reaction_status, }, ); + + self.meta.aggregations.add(target.clone(), aggregation.clone()); + if find_item_and_apply_aggregation(self.items, &target, aggregation) { + self.result.items_updated += 1; + } } #[instrument(skip_all, fields(replacement_event_id = ?replacement.event_id))] @@ -866,13 +839,7 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { .or(pending_edit) .unzip(); - let mut poll_state = PollState::new(c, edit_content); - - if let Some(event_id) = self.ctx.flow.event_id() { - // Applying the cache to remote events only because local echoes - // don't have an event ID that could be referenced by responses yet. - self.meta.pending_poll_events.apply_pending(event_id, &mut poll_state); - } + let poll_state = PollState::new(c, edit_content, Default::default()); let edit_json = edit_json.flatten(); @@ -880,62 +847,36 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } fn handle_poll_response(&mut self, c: UnstablePollResponseEventContent) { - let Some((item_pos, item)) = rfind_event_by_id(self.items, &c.relates_to.event_id) else { - self.meta.pending_poll_events.add_response( - &c.relates_to.event_id, - &self.ctx.sender, - self.ctx.timestamp, - &c, - ); - return; - }; - - let TimelineItemContent::Poll(poll_state) = item.content() else { - return; - }; - - let new_item = item.with_content(TimelineItemContent::Poll(poll_state.add_response( - &self.ctx.sender, - self.ctx.timestamp, - &c, - ))); - - trace!("Adding poll response."); - self.items.replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); - self.result.items_updated += 1; + let target = TimelineEventItemId::EventId(c.relates_to.event_id); + let aggregation = Aggregation::new( + self.ctx.flow.timeline_item_id(), + AggregationKind::PollResponse { + sender: self.ctx.sender.clone(), + timestamp: self.ctx.timestamp, + answers: c.poll_response.answers, + }, + ); + self.meta.aggregations.add(target.clone(), aggregation.clone()); + if find_item_and_apply_aggregation(self.items, &target, aggregation) { + self.result.items_updated += 1; + } } fn handle_poll_end(&mut self, c: UnstablePollEndEventContent) { - let Some((item_pos, item)) = rfind_event_by_id(self.items, &c.relates_to.event_id) else { - self.meta.pending_poll_events.mark_as_ended(&c.relates_to.event_id, self.ctx.timestamp); - return; - }; - - let TimelineItemContent::Poll(poll_state) = item.content() else { - return; - }; - - match poll_state.end(self.ctx.timestamp) { - Ok(poll_state) => { - let new_item = item.with_content(TimelineItemContent::Poll(poll_state)); - - trace!("Ending poll."); - self.items - .replace(item_pos, TimelineItem::new(new_item, item.internal_id.to_owned())); - self.result.items_updated += 1; - } - Err(_) => { - info!("Got multiple poll end events, discarding"); - } + let target = TimelineEventItemId::EventId(c.relates_to.event_id); + let aggregation = Aggregation::new( + self.ctx.flow.timeline_item_id(), + AggregationKind::PollEnd { end_date: self.ctx.timestamp }, + ); + self.meta.aggregations.add(target.clone(), aggregation.clone()); + if find_item_and_apply_aggregation(self.items, &target, aggregation) { + self.result.items_updated += 1; } } /// Looks for the redacted event in all the timeline event items, and /// redacts it. /// - /// This only applies to *remote* events; for local items being redacted, - /// use [`Self::handle_reaction_redaction`]. - /// /// This assumes the redacted event was present in the timeline in the first /// place; it will warn if the redacted event has not been found. #[instrument(skip_all, fields(redacts_event_id = ?redacted))] @@ -943,8 +884,8 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { // TODO: Apply local redaction of PollResponse and PollEnd events. // https://github.com/matrix-org/matrix-rust-sdk/pull/2381#issuecomment-1689647825 - // If it's a reaction that's being redacted, handle it here. - if self.handle_reaction_redaction(TimelineEventItemId::EventId(redacted.clone())) { + // If it's an aggregation that's being redacted, handle it here. + if self.handle_aggregation_redaction(redacted.clone()) { // When we have raw timeline items, we should not return here anymore, as we // might need to redact the raw item as well. return; @@ -974,43 +915,42 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { }; } - /// Attempts to redact a reaction, local or remote. + /// Attempts to redact an aggregation (e.g. a reaction, a poll response, + /// etc.). /// /// Returns true if it's succeeded. - #[instrument(skip_all, fields(redacts = ?reaction_id))] - fn handle_reaction_redaction(&mut self, reaction_id: TimelineEventItemId) -> bool { - if let Some(FullReactionKey { - item: TimelineEventItemId::EventId(reacted_to_event_id), - key, - sender, - }) = self.meta.reactions.map.remove(&reaction_id) - { - let Some((item_pos, item)) = rfind_event_by_id(self.items, &reacted_to_event_id) else { - // The remote event wasn't in the timeline. - if let TimelineEventItemId::EventId(event_id) = reaction_id { - // Remove any possibly pending reactions to that event, as this redaction would - // affect them. - if let Some(reactions) = - self.meta.reactions.pending.get_mut(&reacted_to_event_id) - { - reactions.swap_remove(&event_id); - } - } - - // We haven't redacted the reaction. - return false; - }; + #[instrument(skip_all, fields(redacts = ?aggregation_id))] + fn handle_aggregation_redaction(&mut self, aggregation_id: OwnedEventId) -> bool { + let aggregation_id = TimelineEventItemId::EventId(aggregation_id); + + let Some((target, aggregation)) = + self.meta.aggregations.try_remove_aggregation(&aggregation_id) + else { + // This wasn't a known aggregation that was redacted. + return false; + }; - let mut reactions = item.reactions.clone(); - if reactions.remove_reaction(&sender, &key).is_some() { - trace!("Removing reaction"); - self.items.replace(item_pos, item.with_reactions(reactions)); - self.result.items_updated += 1; - return true; + if let Some((item_pos, item)) = rfind_event_by_item_id(self.items, target) { + let mut content = item.content().clone(); + match aggregation.unapply(&mut content) { + ApplyAggregationResult::UpdatedItem => { + trace!("removed aggregation"); + let internal_id = item.internal_id.to_owned(); + let new_item = item.with_content(content); + self.items.replace(item_pos, TimelineItem::new(new_item, internal_id)); + self.result.items_updated += 1; + } + ApplyAggregationResult::LeftItemIntact => {} + ApplyAggregationResult::Error(err) => { + warn!("error when unapplying aggregation: {err}"); + } } + } else { + info!("missing related-to item ({target:?}) for aggregation {aggregation_id:?}"); } - false + // In all cases, we noticed this was an aggregation. + true } /// Add a new event item in the timeline. @@ -1026,15 +966,21 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { /// timeline item being added here. fn add_item( &mut self, - content: TimelineItemContent, + mut content: TimelineItemContent, edit_json: Option>, ) { self.result.item_added = true; + // Apply any pending or stashed aggregations. + if let Err(err) = + self.meta.aggregations.apply(&self.ctx.flow.timeline_item_id(), &mut content) + { + warn!("discarding aggregations: {err}"); + } + let sender = self.ctx.sender.to_owned(); let sender_profile = TimelineDetails::from_initial_value(self.ctx.sender_profile.clone()); let timestamp = self.ctx.timestamp; - let reactions = self.pending_reactions(&content).unwrap_or_default(); let kind: EventTimelineItemKind = match &self.ctx.flow { Flow::Local { txn_id, send_handle } => LocalEventTimelineItem { @@ -1083,7 +1029,6 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { timestamp, content, kind, - reactions, is_room_encrypted, ); @@ -1345,36 +1290,6 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> { } }); } - - fn pending_reactions( - &mut self, - content: &TimelineItemContent, - ) -> Option { - // Drop pending reactions if the message is redacted. - if let TimelineItemContent::RedactedMessage = content { - return None; - } - - self.ctx.flow.event_id().and_then(|event_id| { - let reactions = self.meta.reactions.pending.remove(event_id)?; - let mut bundled = ReactionsByKeyBySender::default(); - - for (reaction_event_id, reaction) in reactions { - let group: &mut IndexMap = - bundled.entry(reaction.key).or_default(); - - group.insert( - reaction.sender_id, - ReactionInfo { - timestamp: reaction.timestamp, - status: ReactionStatus::RemoteToRemote(reaction_event_id), - }, - ); - } - - Some(bundled) - }) - } } /// Transfer `TimelineDetails` that weren't available on the original diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/content/message.rs b/crates/matrix-sdk-ui/src/timeline/event_item/content/message.rs index 09ee071f923..61333b434ca 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/content/message.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/content/message.rs @@ -44,7 +44,7 @@ use crate::{ timeline::{ event_item::{EventTimelineItem, Profile, TimelineDetails}, traits::RoomDataProvider, - Error as TimelineError, TimelineItem, + Error as TimelineError, ReactionsByKeyBySender, TimelineItem, }, DEFAULT_SANITIZER_MODE, }; @@ -58,6 +58,7 @@ pub struct Message { pub(in crate::timeline) thread_root: Option, pub(in crate::timeline) edited: bool, pub(in crate::timeline) mentions: Option, + pub(in crate::timeline) reactions: ReactionsByKeyBySender, } impl Message { @@ -66,6 +67,7 @@ impl Message { c: RoomMessageEventContent, edit: Option, timeline_items: &Vector>, + reactions: ReactionsByKeyBySender, ) -> Self { let mut thread_root = None; let in_reply_to = c.relates_to.and_then(|relation| match relation { @@ -87,8 +89,14 @@ impl Message { let mut msgtype = c.msgtype; msgtype.sanitize(DEFAULT_SANITIZER_MODE, remove_reply_fallback); - let mut ret = - Self { msgtype, in_reply_to, thread_root, edited: false, mentions: c.mentions }; + let mut ret = Self { + msgtype, + in_reply_to, + thread_root, + edited: false, + mentions: c.mentions, + reactions, + }; if let Some(edit) = edit { ret.apply_edit(edit); @@ -264,7 +272,7 @@ fn make_relates_to( #[cfg(not(tarpaulin_include))] impl fmt::Debug for Message { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let Self { msgtype: _, in_reply_to, thread_root, edited, mentions: _ } = self; + let Self { msgtype: _, in_reply_to, thread_root, edited, reactions: _, mentions: _ } = self; // since timeline items are logged, don't include all fields here so // people don't leak personal data in bug reports f.debug_struct("Message") @@ -361,10 +369,14 @@ impl RepliedToEvent { return Err(TimelineError::UnsupportedEvent); }; + // Assume we're not interested in reactions in this context. + let reactions = Default::default(); + let content = TimelineItemContent::Message(Message::from_event( c, extract_room_msg_edit_content(event.relations()), &vector![], + reactions, )); let sender = event.sender().to_owned(); let sender_profile = TimelineDetails::from_initial_value( diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs index 9cd83213239..40d39d080be 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/content/mod.rs @@ -69,16 +69,14 @@ mod polls; pub use pinned_events::RoomPinnedEventsChange; -pub(in crate::timeline) use self::{ - message::{ - extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, - }, - polls::ResponseData, +pub(in crate::timeline) use self::message::{ + extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, }; pub use self::{ message::{InReplyToDetails, Message, RepliedToEvent}, polls::{PollResult, PollState}, }; +use super::ReactionsByKeyBySender; /// The content of an [`EventTimelineItem`][super::EventTimelineItem]. #[derive(Clone, Debug)] @@ -213,10 +211,12 @@ impl TimelineItemContent { // `Message::from_event` marks the original event as `Unavailable` if it can't // be found inside the timeline_items. let timeline_items = Vector::new(); + let reactions = Default::default(); TimelineItemContent::Message(Message::from_event( event_content, edit, &timeline_items, + reactions, )) } @@ -249,7 +249,10 @@ impl TimelineItemContent { SyncStickerEvent::Original(event) => { // Grab the content of this event let event_content = event.content.clone(); - TimelineItemContent::Sticker(Sticker { content: event_content }) + TimelineItemContent::Sticker(Sticker { + content: event_content, + reactions: Default::default(), + }) } SyncStickerEvent::Redacted(_) => TimelineItemContent::RedactedMessage, } @@ -276,9 +279,13 @@ impl TimelineItemContent { } }); + // We're not interested in reactions for the latest preview item. + let reactions = Default::default(); + TimelineItemContent::Poll(PollState::new( NewUnstablePollStartEventContent::new(event.content.poll_start().clone()), edit, + reactions, )) } @@ -324,8 +331,9 @@ impl TimelineItemContent { c: RoomMessageEventContent, edit: Option, timeline_items: &Vector>, + reactions: ReactionsByKeyBySender, ) -> Self { - Self::Message(Message::from_event(c, edit, timeline_items)) + Self::Message(Message::from_event(c, edit, timeline_items, reactions)) } #[cfg(not(tarpaulin_include))] // debug-logging functionality @@ -430,6 +438,70 @@ impl TimelineItemContent { Self::FailedToParseMessageLike { .. } | Self::FailedToParseState { .. } => self.clone(), } } + + /// Return the reactions, grouped by key and then by sender, for a given + /// content. + /// + /// Some content kinds can't hold reactions; for these, this function will + /// return `None`. + pub fn reactions(&self) -> ReactionsByKeyBySender { + match self { + TimelineItemContent::Message(message) => message.reactions.clone(), + TimelineItemContent::Sticker(sticker) => sticker.reactions.clone(), + TimelineItemContent::Poll(poll_state) => poll_state.reactions.clone(), + + TimelineItemContent::UnableToDecrypt(..) | TimelineItemContent::RedactedMessage => { + // No reactions for redacted messages or UTDs. + Default::default() + } + + TimelineItemContent::MembershipChange(..) + | TimelineItemContent::ProfileChange(..) + | TimelineItemContent::OtherState(..) + | TimelineItemContent::FailedToParseMessageLike { .. } + | TimelineItemContent::FailedToParseState { .. } + | TimelineItemContent::CallInvite + | TimelineItemContent::CallNotify => { + // No reactions for these kind of items. + Default::default() + } + } + } + + /// Return a mutable handle to the reactions of this item. + /// + /// See also [`Self::reactions()`] to explain the optional return type. + pub(crate) fn reactions_mut(&mut self) -> Option<&mut ReactionsByKeyBySender> { + match self { + TimelineItemContent::Message(message) => Some(&mut message.reactions), + TimelineItemContent::Sticker(sticker) => Some(&mut sticker.reactions), + TimelineItemContent::Poll(poll_state) => Some(&mut poll_state.reactions), + + TimelineItemContent::UnableToDecrypt(..) | TimelineItemContent::RedactedMessage => { + // No reactions for redacted messages or UTDs. + None + } + + TimelineItemContent::MembershipChange(..) + | TimelineItemContent::ProfileChange(..) + | TimelineItemContent::OtherState(..) + | TimelineItemContent::FailedToParseMessageLike { .. } + | TimelineItemContent::FailedToParseState { .. } + | TimelineItemContent::CallInvite + | TimelineItemContent::CallNotify => { + // No reactions for these kind of items. + None + } + } + } + + pub fn with_reactions(&self, reactions: ReactionsByKeyBySender) -> Self { + let mut cloned = self.clone(); + if let Some(r) = cloned.reactions_mut() { + *r = reactions; + } + cloned + } } /// Metadata about an `m.room.encrypted` event that could not be decrypted. @@ -485,6 +557,7 @@ impl EncryptedMessage { #[derive(Clone, Debug)] pub struct Sticker { pub(in crate::timeline) content: StickerEventContent, + pub(in crate::timeline) reactions: ReactionsByKeyBySender, } impl Sticker { diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs b/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs index b758caf124d..28b43c2138b 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/content/polls.rs @@ -20,7 +20,6 @@ use ruma::{ events::poll::{ compile_unstable_poll_results, start::PollKind, - unstable_response::UnstablePollResponseEventContent, unstable_start::{ NewUnstablePollStartEventContent, NewUnstablePollStartEventContentWithoutRelation, UnstablePollStartContentBlock, @@ -30,6 +29,8 @@ use ruma::{ MilliSecondsSinceUnixEpoch, OwnedUserId, UserId, }; +use crate::timeline::ReactionsByKeyBySender; + /// Holds the state of a poll. /// /// This struct should be created for each poll start event handled and then @@ -41,6 +42,7 @@ pub struct PollState { pub(in crate::timeline) response_data: Vec, pub(in crate::timeline) end_event_timestamp: Option, pub(in crate::timeline) has_been_edited: bool, + pub(in crate::timeline) reactions: ReactionsByKeyBySender, } #[derive(Clone, Debug)] @@ -54,12 +56,14 @@ impl PollState { pub(crate) fn new( content: NewUnstablePollStartEventContent, edit: Option, + reactions: ReactionsByKeyBySender, ) -> Self { let mut ret = Self { start_event_content: content, response_data: vec![], end_event_timestamp: None, has_been_edited: false, + reactions, }; if let Some(edit) = edit { @@ -88,31 +92,41 @@ impl PollState { } } + /// Add a response to a poll. pub(crate) fn add_response( - &self, + &mut self, + sender: OwnedUserId, + timestamp: MilliSecondsSinceUnixEpoch, + answers: Vec, + ) { + self.response_data.push(ResponseData { sender, timestamp, answers }); + } + + /// Remove a response from the poll, as identified by its sender and + /// timestamp values. + pub(crate) fn remove_response( + &mut self, sender: &UserId, timestamp: MilliSecondsSinceUnixEpoch, - content: &UnstablePollResponseEventContent, - ) -> Self { - let mut clone = self.clone(); - clone.response_data.push(ResponseData { - sender: sender.to_owned(), - timestamp, - answers: content.poll_response.answers.clone(), - }); - clone + ) { + if let Some(idx) = self + .response_data + .iter() + .position(|resp| resp.sender == sender && resp.timestamp == timestamp) + { + self.response_data.remove(idx); + } } /// Marks the poll as ended. /// - /// If the poll has already ended, returns `Err(())`. - pub(crate) fn end(&self, timestamp: MilliSecondsSinceUnixEpoch) -> Result { + /// Returns false if the poll was already ended, true otherwise. + pub(crate) fn end(&mut self, timestamp: MilliSecondsSinceUnixEpoch) -> bool { if self.end_event_timestamp.is_none() { - let mut clone = self.clone(); - clone.end_event_timestamp = Some(timestamp); - Ok(clone) + self.end_event_timestamp = Some(timestamp); + true } else { - Err(()) + false } } diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 87c1be1ed96..bd0bf6a2151 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -45,7 +45,6 @@ mod remote; pub(super) use self::{ content::{ extract_bundled_edit_event_json, extract_poll_edit_content, extract_room_msg_edit_content, - ResponseData, }, local::LocalEventTimelineItem, remote::{RemoteEventOrigin, RemoteEventTimelineItem}, @@ -71,8 +70,6 @@ pub struct EventTimelineItem { pub(super) sender: OwnedUserId, /// The sender's profile of the event. pub(super) sender_profile: TimelineDetails, - /// All bundled reactions about the event. - pub(super) reactions: ReactionsByKeyBySender, /// The timestamp of the event. pub(super) timestamp: MilliSecondsSinceUnixEpoch, /// The content of the event. @@ -121,11 +118,10 @@ impl EventTimelineItem { timestamp: MilliSecondsSinceUnixEpoch, content: TimelineItemContent, kind: EventTimelineItemKind, - reactions: ReactionsByKeyBySender, is_room_encrypted: bool, ) -> Self { let is_room_encrypted = Some(is_room_encrypted); - Self { sender, sender_profile, timestamp, content, reactions, kind, is_room_encrypted } + Self { sender, sender_profile, timestamp, content, kind, is_room_encrypted } } /// If the supplied low-level [`TimelineEvent`] is suitable for use as the @@ -175,10 +171,6 @@ impl EventTimelineItem { let content = TimelineItemContent::from_latest_event_content(event, room_power_levels_info)?; - // We don't currently bundle any reactions with the main event. This could - // conceivably be wanted in the message preview in future. - let reactions = ReactionsByKeyBySender::default(); - // The message preview probably never needs read receipts. let read_receipts = IndexMap::new(); @@ -219,15 +211,7 @@ impl EventTimelineItem { TimelineDetails::Unavailable }; - Some(Self { - sender, - sender_profile, - timestamp, - content, - kind, - reactions, - is_room_encrypted: None, - }) + Some(Self { sender, sender_profile, timestamp, content, kind, is_room_encrypted: None }) } /// Check whether this item is a local echo. @@ -332,11 +316,6 @@ impl EventTimelineItem { &self.content } - /// Get the reactions of this item. - pub fn reactions(&self) -> &ReactionsByKeyBySender { - &self.reactions - } - /// Get the read receipts of this item. /// /// The key is the ID of a room member and the value are details about the @@ -504,11 +483,6 @@ impl EventTimelineItem { Self { kind: kind.into(), ..self.clone() } } - /// Clone the current event item, and update its `reactions`. - pub fn with_reactions(&self, reactions: ReactionsByKeyBySender) -> Self { - Self { reactions, ..self.clone() } - } - /// Clone the current event item, and update its content. pub(super) fn with_content(&self, new_content: TimelineItemContent) -> Self { let mut new = self.clone(); @@ -562,7 +536,6 @@ impl EventTimelineItem { content, kind, is_room_encrypted: self.is_room_encrypted, - reactions: ReactionsByKeyBySender::default(), } } @@ -765,6 +738,8 @@ pub enum ReactionStatus { /// The handle is missing only in testing contexts. LocalToRemote(Option), /// It's a remote reaction to a remote event. + /// + /// The event id is that of the reaction event (not the target event). RemoteToRemote(OwnedEventId), } diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index bf9dfbe00f3..0eeee46240e 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -72,7 +72,6 @@ pub mod futures; mod item; mod pagination; mod pinned_events_loader; -mod reactions; mod subscriber; #[cfg(test)] mod tests; @@ -400,10 +399,13 @@ impl Timeline { original_message, )) = message_like_event { + // We don't have access to reactions here. + let reactions = Default::default(); ReplyContent::Message(Message::from_event( original_message.content.clone(), extract_room_msg_edit_content(message_like_event.relations()), &self.items().await, + reactions, )) } else { ReplyContent::Raw(raw_sync_event) diff --git a/crates/matrix-sdk-ui/src/timeline/reactions.rs b/crates/matrix-sdk-ui/src/timeline/reactions.rs deleted file mode 100644 index 6062f5c2d7f..00000000000 --- a/crates/matrix-sdk-ui/src/timeline/reactions.rs +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use indexmap::IndexMap; -use ruma::{events::relation::Annotation, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId}; - -use super::event_item::TimelineEventItemId; - -// Implements hash etc -#[derive(Clone, Hash, PartialEq, Eq, Debug)] -pub(super) struct AnnotationKey { - event_id: OwnedEventId, - key: String, -} - -impl From<&Annotation> for AnnotationKey { - fn from(annotation: &Annotation) -> Self { - Self { event_id: annotation.event_id.clone(), key: annotation.key.clone() } - } -} - -#[derive(Clone, Debug)] -pub(crate) struct PendingReaction { - /// The annotation used for the reaction. - pub key: String, - /// Sender identifier. - pub sender_id: OwnedUserId, - /// Date at which the sender reacted. - pub timestamp: MilliSecondsSinceUnixEpoch, -} - -#[derive(Clone, Debug)] -pub(crate) struct FullReactionKey { - pub item: TimelineEventItemId, - pub key: String, - pub sender: OwnedUserId, -} - -#[derive(Clone, Debug, Default)] -pub(super) struct Reactions { - /// Reaction event / txn ID => full path to the reaction in some item. - pub map: HashMap, - /// Mapping of events that are not in the timeline => reaction event id => - /// pending reaction. - pub pending: HashMap>, -} - -impl Reactions { - pub(super) fn clear(&mut self) { - self.map.clear(); - self.pending.clear(); - } -} diff --git a/crates/matrix-sdk-ui/src/timeline/tests/event_filter.rs b/crates/matrix-sdk-ui/src/timeline/tests/event_filter.rs index da1d7d40062..7c262f9bdfc 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/event_filter.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/event_filter.rs @@ -83,7 +83,7 @@ async fn test_default_filter() { timeline.handle_live_event(f.reaction(third_event_id, "+1").sender(&BOB)).await; timeline.handle_live_event(f.redaction(second_event_id).sender(&BOB)).await; let item = assert_next_matches!(stream, VectorDiff::Set { index: 3, value } => value); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_eq!(item.as_event().unwrap().content().reactions().len(), 1); // TODO: After adding raw timeline items, check for one here. diff --git a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs index a217da4ec14..4771f1fdbe0 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/mod.rs @@ -78,8 +78,10 @@ mod redaction; mod shields; mod virt; +/// A timeline instance used only for testing purposes in unit tests. struct TestTimeline { controller: TimelineController, + /// An [`EventFactory`] that can be used for creating events in this /// timeline. pub factory: EventFactory, @@ -237,6 +239,14 @@ impl TestTimeline { async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) { self.controller.handle_room_send_queue_update(update).await } + + async fn handle_event_update( + &self, + diffs: Vec>, + origin: RemoteEventOrigin, + ) { + self.controller.handle_remote_events_with_diffs(diffs, origin).await; + } } type ReadReceiptMap = diff --git a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs index ac1e8e29d7b..3ab5c34d268 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/reactions.rs @@ -19,13 +19,13 @@ use eyeball_im::VectorDiff; use futures_core::Stream; use futures_util::{FutureExt as _, StreamExt as _}; use imbl::vector; -use matrix_sdk::deserialized_responses::TimelineEvent; -use matrix_sdk_test::{async_test, event_factory::EventFactory, sync_timeline_event, ALICE, BOB}; +use matrix_sdk::assert_next_matches_with_timeout; +use matrix_sdk_test::{async_test, ALICE, BOB}; use ruma::{ event_id, events::AnyMessageLikeEventContent, server_name, uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, }; -use stream_assert::assert_next_matches; +use stream_assert::{assert_next_matches, assert_pending}; use tokio::time::timeout; use crate::timeline::{ @@ -67,7 +67,8 @@ macro_rules! assert_item_update { macro_rules! assert_reaction_is_updated { ($stream:expr, $event_id:expr, $index:expr, $is_remote_echo:literal) => {{ let event = assert_item_update!($stream, $event_id, $index); - let reactions = event.reactions().get(&REACTION_KEY.to_owned()).unwrap(); + let reactions = event.content().reactions(); + let reactions = reactions.get(&REACTION_KEY.to_owned()).unwrap(); let reaction = reactions.get(*ALICE).unwrap(); match reaction.status { ReactionStatus::LocalToRemote(_) | ReactionStatus::LocalToLocal(_) => { @@ -113,8 +114,9 @@ async fn test_add_reaction_success() { } // When the remote echo is received from sync, - let f = EventFactory::new(); - timeline.handle_live_event(f.reaction(&event_id, REACTION_KEY).sender(*ALICE)).await; + timeline + .handle_live_event(timeline.factory.reaction(&event_id, REACTION_KEY).sender(*ALICE)) + .await; // The reaction is still present on the item, as a remote echo. assert_reaction_is_updated!(stream, &event_id, item_pos, true); @@ -142,7 +144,8 @@ async fn test_redact_reaction_success() { // Will immediately redact it on the item. let event = assert_item_update!(stream, &event_id, item_pos); - assert!(event.reactions().get(&REACTION_KEY.to_owned()).is_none()); + assert!(event.content().reactions().get(&REACTION_KEY.to_owned()).is_none()); + // And send a redaction request for that reaction. { let redacted_events = &timeline.data().redacted.read().await; @@ -152,17 +155,11 @@ async fn test_redact_reaction_success() { // When that redaction is confirmed by the server, timeline - .handle_live_event(TimelineEvent::new(sync_timeline_event!({ - "sender": *ALICE, - "type": "m.room.redaction", - "event_id": "$idb", - "redacts": reaction_id, - "origin_server_ts": 12344448, - "content": {}, - }))) + .handle_live_event(f.redaction(reaction_id).sender(*ALICE).event_id(event_id!("$idb"))) .await; - assert!(stream.next().now_or_never().is_none()); + // Nothing happens, because the reaction was already redacted. + assert_pending!(stream); } #[async_test] @@ -177,7 +174,8 @@ async fn test_reactions_store_timestamp() { timeline.toggle_reaction_local(&item_id, REACTION_KEY).await.unwrap(); let event = assert_reaction_is_updated!(stream, &event_id, msg_pos, false); - let reactions = event.reactions().get(&REACTION_KEY.to_owned()).unwrap(); + let reactions = event.content().reactions(); + let reactions = reactions.get(&REACTION_KEY.to_owned()).unwrap(); let timestamp = reactions.values().next().unwrap().timestamp; let now = MilliSecondsSinceUnixEpoch::now(); @@ -188,7 +186,7 @@ async fn test_reactions_store_timestamp() { async fn test_initial_reaction_timestamp_is_stored() { let timeline = TestTimeline::new(); - let f = EventFactory::new().sender(*ALICE); + let f = &timeline.factory; let message_event_id = EventId::new(server_name!("dummy.server")); let reaction_timestamp = MilliSecondsSinceUnixEpoch(uint!(39845)); @@ -199,10 +197,11 @@ async fn test_initial_reaction_timestamp_is_stored() { values: vector![ // Reaction comes first. f.reaction(&message_event_id, REACTION_KEY) + .sender(*ALICE) .server_ts(reaction_timestamp) .into_event(), // Event comes next. - f.text_msg("A").event_id(&message_event_id).into_event(), + f.text_msg("A").sender(*ALICE).event_id(&message_event_id).into_event(), ], }], RemoteEventOrigin::Sync, @@ -210,7 +209,7 @@ async fn test_initial_reaction_timestamp_is_stored() { .await; let items = timeline.controller.items().await; - let reactions = items.last().unwrap().as_event().unwrap().reactions(); + let reactions = items.last().unwrap().as_event().unwrap().content().reactions(); let entry = reactions.get(&REACTION_KEY.to_owned()).unwrap(); assert_eq!(entry.values().next().unwrap().timestamp, reaction_timestamp); @@ -234,3 +233,73 @@ async fn send_first_message( (item_id, event_id, position) } + +#[async_test] +async fn test_reinserted_item_keeps_reactions() { + // This test checks that after deduplicating events, the reactions attached to + // the deduplicated event are not lost. + let timeline = TestTimeline::new(); + let f = &timeline.factory; + + // We receive an initial update with one event and a reaction to this event. + let reaction_target = event_id!("$1"); + let target_event = f.text_msg("hey").sender(&BOB).event_id(reaction_target).into_event(); + let reaction_event = f + .reaction(reaction_target, REACTION_KEY) + .sender(&ALICE) + .event_id(event_id!("$2")) + .into_event(); + + let mut stream = timeline.subscribe_events().await; + + timeline + .handle_event_update( + vec![VectorDiff::Append { values: vector![target_event.clone(), reaction_event] }], + RemoteEventOrigin::Sync, + ) + .await; + + // Get the event. + assert_next_matches_with_timeout!(stream, VectorDiff::PushBack { value: item } => { + assert_eq!(item.content().as_message().unwrap().body(), "hey"); + assert!(item.content().reactions().is_empty()); + }); + + // Get the reaction. + assert_next_matches_with_timeout!(stream, VectorDiff::Set { index: 0, value: item } => { + assert_eq!(item.content().as_message().unwrap().body(), "hey"); + let reactions = item.content().reactions(); + assert_eq!(reactions.len(), 1); + reactions.get(REACTION_KEY).unwrap().get(*ALICE).unwrap(); + }); + + // And that's it for now. + assert_pending!(stream); + + // Then the event is removed and reinserted. This sequences of update is + // possible if the event cache decided to deduplicate a given event. + timeline + .handle_event_update( + vec![ + VectorDiff::Remove { index: 0 }, + VectorDiff::Insert { index: 0, value: target_event }, + ], + RemoteEventOrigin::Sync, + ) + .await; + + // The duplicate event is removed… + assert_next_matches_with_timeout!(stream, VectorDiff::Remove { index: 0 }); + + // …And reinserted. + assert_next_matches_with_timeout!(stream, VectorDiff::Insert { index: 0, value: item } => { + assert_eq!(item.content().as_message().unwrap().body(), "hey"); + // And it still includes the reaction from Alice. + let reactions = item.content().reactions(); + assert_eq!(reactions.len(), 1); + reactions.get(REACTION_KEY).unwrap().get(*ALICE).unwrap(); + }); + + // No other updates. + assert_pending!(stream); +} diff --git a/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs b/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs index 19977b98d97..5c0a8c8e22d 100644 --- a/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs +++ b/crates/matrix-sdk-ui/src/timeline/tests/redaction.rs @@ -103,13 +103,13 @@ async fn test_reaction_redaction() { timeline.handle_live_event(f.text_msg("hi!").sender(&ALICE)).await; let item = assert_next_matches!(stream, VectorDiff::PushBack { value } => value); - assert_eq!(item.reactions().len(), 0); + assert_eq!(item.content().reactions().len(), 0); let msg_event_id = item.event_id().unwrap(); timeline.handle_live_event(f.reaction(msg_event_id, "+1").sender(&BOB)).await; let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value); - assert_eq!(item.reactions().len(), 1); + assert_eq!(item.content().reactions().len(), 1); // TODO: After adding raw timeline items, check for one here @@ -117,7 +117,7 @@ async fn test_reaction_redaction() { timeline.handle_live_event(f.redaction(reaction_event_id).sender(&BOB)).await; let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value); - assert_eq!(item.reactions().len(), 0); + assert_eq!(item.content().reactions().len(), 0); } #[async_test] @@ -161,6 +161,6 @@ async fn test_reaction_redaction_timeline_filter() { // Redacting the reaction doesn't add a timeline item. timeline.handle_live_event(f.redaction(reaction_event_id).sender(&BOB)).await; let item = assert_next_matches!(stream, VectorDiff::Set { index: 0, value } => value); - assert_eq!(item.reactions().len(), 0); + assert_eq!(item.content().reactions().len(), 0); assert_eq!(timeline.controller.items().await.len(), 2); } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs index f136a76bc15..654b1a4467e 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/focus_event.rs @@ -236,7 +236,7 @@ async fn test_focused_timeline_reacts() { let event_item = items[1].as_event().unwrap(); assert_eq!(event_item.content().as_message().unwrap().body(), "yolo"); - assert_eq!(event_item.reactions().len(), 0); + assert_eq!(event_item.content().reactions().len(), 0); assert_pending!(timeline_stream); @@ -263,7 +263,7 @@ async fn test_focused_timeline_reacts() { // Text hasn't changed. assert_eq!(event_item.content().as_message().unwrap().body(), "yolo"); // But now there's one reaction to the event. - assert_eq!(event_item.reactions().len(), 1); + assert_eq!(event_item.content().reactions().len(), 1); // And nothing more. assert_pending!(timeline_stream); @@ -321,7 +321,7 @@ async fn test_focused_timeline_local_echoes() { let event_item = items[1].as_event().unwrap(); assert_eq!(event_item.content().as_message().unwrap().body(), "yolo"); - assert_eq!(event_item.reactions().len(), 0); + assert_eq!(event_item.content().reactions().len(), 0); sleep(Duration::from_millis(100)).await; assert_pending!(timeline_stream); @@ -339,7 +339,7 @@ async fn test_focused_timeline_local_echoes() { // Text hasn't changed. assert_eq!(event_item.content().as_message().unwrap().body(), "yolo"); // But now there's one reaction to the event. - let reactions = event_item.reactions(); + let reactions = event_item.content().reactions(); assert_eq!(reactions.len(), 1); assert!(reactions.get("✨").unwrap().get(client.user_id().unwrap()).is_some()); @@ -400,7 +400,7 @@ async fn test_focused_timeline_doesnt_show_local_echoes() { let event_item = items[1].as_event().unwrap(); assert_eq!(event_item.content().as_message().unwrap().body(), "yolo"); - assert_eq!(event_item.reactions().len(), 0); + assert_eq!(event_item.content().reactions().len(), 0); assert_pending!(timeline_stream); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs index d552b4d38ff..e3cec52778d 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/media.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/media.rs @@ -280,7 +280,7 @@ async fn test_react_to_local_media() { assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", None)); // The item starts with no reactions. - assert!(item.reactions().is_empty()); + assert!(item.content().reactions().is_empty()); item.identifier() }; @@ -293,7 +293,7 @@ async fn test_react_to_local_media() { assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", None)); // There's a reaction for the current user for the given emoji. - let reactions = item.reactions(); + let reactions = item.content().reactions(); let own_user_id = client.user_id().unwrap(); reactions.get("🤪").unwrap().get(own_user_id).unwrap(); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index 1206690c4a2..0db4246f114 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -130,7 +130,7 @@ async fn test_reaction() { assert_let!(TimelineItemContent::Message(msg) = event_item.content()); assert!(!msg.is_edited()); assert_eq!(event_item.read_receipts().len(), 2); - assert_eq!(event_item.reactions().len(), 0); + assert_eq!(event_item.content().reactions().len(), 0); // Then the reaction is taken into account. assert_let!(VectorDiff::Set { index: 0, value: updated_message } = &timeline_updates[2]); @@ -138,8 +138,8 @@ async fn test_reaction() { assert_let!(TimelineItemContent::Message(msg) = event_item.content()); assert!(!msg.is_edited()); assert_eq!(event_item.read_receipts().len(), 2); - assert_eq!(event_item.reactions().len(), 1); - let group = &event_item.reactions()["👍"]; + assert_eq!(event_item.content().reactions().len(), 1); + let group = &event_item.content().reactions()["👍"]; assert_eq!(group.len(), 1); let senders: Vec<_> = group.keys().collect(); assert_eq!(senders.as_slice(), [user_id!("@bob:example.org")]); @@ -170,7 +170,7 @@ async fn test_reaction() { let event_item = updated_message.as_event().unwrap(); assert_let!(TimelineItemContent::Message(msg) = event_item.content()); assert!(!msg.is_edited()); - assert_eq!(event_item.reactions().len(), 0); + assert_eq!(event_item.content().reactions().len(), 0); assert_pending!(timeline_stream); } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs index 4edf2182119..d3bca1c1470 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/reactions.rs @@ -19,7 +19,7 @@ use eyeball_im::VectorDiff; use futures_util::StreamExt as _; use matrix_sdk::{assert_let_timeout, test_utils::mocks::MatrixMockServer}; use matrix_sdk_test::{async_test, event_factory::EventFactory, JoinedRoomBuilder, ALICE}; -use matrix_sdk_ui::timeline::{ReactionStatus, RoomExt as _}; +use matrix_sdk_ui::timeline::{EventSendState, ReactionStatus, RoomExt as _}; use ruma::{event_id, events::room::message::RoomMessageEventContent, room_id}; use serde_json::json; use stream_assert::assert_pending; @@ -118,7 +118,7 @@ async fn test_abort_before_being_sent() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); + let reactions = item.as_event().unwrap().content().reactions(); assert_eq!(reactions.len(), 1); assert_matches!( &reactions.get("👍").unwrap().get(user_id).unwrap().status, @@ -138,7 +138,7 @@ async fn test_abort_before_being_sent() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); + let reactions = item.as_event().unwrap().content().reactions(); assert_eq!(reactions.len(), 2); assert_matches!( &reactions.get("👍").unwrap().get(user_id).unwrap().status, @@ -162,7 +162,7 @@ async fn test_abort_before_being_sent() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); + let reactions = item.as_event().unwrap().content().reactions(); assert_eq!(reactions.len(), 1); assert_matches!( &reactions.get("🥰").unwrap().get(user_id).unwrap().status, @@ -182,7 +182,7 @@ async fn test_abort_before_being_sent() { assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); + let reactions = item.as_event().unwrap().content().reactions(); assert!(reactions.is_empty()); assert_pending!(stream); @@ -237,13 +237,13 @@ async fn test_redact_failed() { let item = item.as_event().unwrap(); assert_eq!(item.content().as_message().unwrap().body(), "hello"); - assert!(item.reactions().is_empty()); + assert!(item.content().reactions().is_empty()); item.identifier() }; assert_let!(VectorDiff::Set { index: 0, value: item } = &timeline_updates[1]); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_eq!(item.as_event().unwrap().content().reactions().len(), 1); assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[2]); assert!(date_divider.is_date_divider()); @@ -259,11 +259,11 @@ async fn test_redact_failed() { // The local echo is removed (assuming the redaction works)… assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - assert!(item.as_event().unwrap().reactions().is_empty()); + assert!(item.as_event().unwrap().content().reactions().is_empty()); // …then added back, after redaction failed. assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[1]); - assert_eq!(item.as_event().unwrap().reactions().len(), 1); + assert_eq!(item.as_event().unwrap().content().reactions().len(), 1); tokio::time::sleep(Duration::from_millis(150)).await; assert_pending!(stream); @@ -314,83 +314,119 @@ async fn test_local_reaction_to_local_echo() { // Send a local event. let _ = timeline.send(RoomMessageEventContent::text_plain("lol").into()).await.unwrap(); - assert_let!(Some(timeline_updates) = stream.next().await); - assert_eq!(timeline_updates.len(), 2); - - // Receive a local echo. - let item_id = { + assert_let_timeout!(Some(timeline_updates) = stream.next()); + assert_eq!(timeline_updates.len(), 2); + + // Receive a local echo. assert_let!(VectorDiff::PushBack { value: item } = &timeline_updates[0]); let item = item.as_event().unwrap(); assert!(item.is_local_echo()); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + assert_eq!(item.content().as_message().unwrap().body(), "lol"); - assert!(item.reactions().is_empty()); + assert!(item.content().reactions().is_empty()); + + // Good ol' date divider. + assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); + assert!(date_divider.is_date_divider()); + + assert_pending!(stream); + item.identifier() }; - // Good ol' date divider. - assert_let!(VectorDiff::PushFront { value: date_divider } = &timeline_updates[1]); - assert!(date_divider.is_date_divider()); - // Add a reaction before the remote echo comes back. let key1 = "🤣"; timeline.toggle_reaction(&item_id, key1).await.unwrap(); - assert_let!(Some(timeline_updates) = stream.next().await); - assert_eq!(timeline_updates.len(), 1); + { + assert_let_timeout!(Some(timeline_updates) = stream.next()); + assert_eq!(timeline_updates.len(), 1); - // The reaction is added to the local echo. - assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); - assert_eq!(reactions.len(), 1); - let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); - assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + // The reaction is added to the local echo. + assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); + let item = item.as_event().unwrap(); + assert!(item.is_local_echo()); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + + let reactions = item.content().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + + assert_pending!(stream); + } // Add another reaction. let key2 = "😈"; timeline.toggle_reaction(&item_id, key2).await.unwrap(); - assert_let!(Some(timeline_updates) = stream.next().await); - assert_eq!(timeline_updates.len(), 1); + { + assert_let_timeout!(Some(timeline_updates) = stream.next()); + assert_eq!(timeline_updates.len(), 1); - // Also comes as a local echo. - assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); - assert_eq!(reactions.len(), 2); - let reaction_info = reactions.get(key2).unwrap().get(user_id).unwrap(); - assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + // Also comes as a local echo. + assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); + let item = item.as_event().unwrap(); + assert!(item.is_local_echo()); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); + + let reactions = item.content().reactions(); + assert_eq!(reactions.len(), 2); + let reaction_info = reactions.get(key2).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + + assert_pending!(stream); + } // Remove second reaction. It's immediately removed, since it was a local echo, // and it wasn't being sent. timeline.toggle_reaction(&item_id, key2).await.unwrap(); - assert_let!(Some(timeline_updates) = stream.next().await); - assert_eq!(timeline_updates.len(), 1); + { + assert_let_timeout!(Some(timeline_updates) = stream.next()); + assert_eq!(timeline_updates.len(), 1); - assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); - assert_eq!(reactions.len(), 1); - let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); - assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); + let item = item.as_event().unwrap(); + assert!(item.is_local_echo()); + assert_matches!(item.send_state(), Some(EventSendState::NotSentYet)); - assert_let!(Some(timeline_updates) = stream.next().await); - assert_eq!(timeline_updates.len(), 1); + let reactions = item.content().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + + assert_pending!(stream); + } // Now, wait for the remote echo for the message itself. - assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); - assert_eq!(reactions.len(), 1); - let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); - // TODO(bnjbvr): why not LocalToRemote here? - assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + { + assert_let_timeout!(Duration::from_secs(2), Some(timeline_updates) = stream.next()); + assert_eq!(timeline_updates.len(), 1); - assert_let!(Some(timeline_updates) = stream.next().await); + assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); + let item = item.as_event().unwrap(); + + // Still a local echo, but now has a send state set to Sent. + assert!(item.is_local_echo()); + assert_matches!(item.send_state(), Some(EventSendState::Sent { .. })); + + let reactions = item.content().reactions(); + assert_eq!(reactions.len(), 1); + let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); + // TODO(bnjbvr): why not LocalToRemote here? + assert_matches!(&reaction_info.status, ReactionStatus::LocalToLocal(..)); + } + + assert_let_timeout!(Some(timeline_updates) = stream.next()); assert_eq!(timeline_updates.len(), 1); // And then the remote echo for the reaction itself. assert_let!(VectorDiff::Set { index: 1, value: item } = &timeline_updates[0]); - let reactions = item.as_event().unwrap().reactions(); + let reactions = item.as_event().unwrap().content().reactions(); assert_eq!(reactions.len(), 1); let reaction_info = reactions.get(key1).unwrap().get(user_id).unwrap(); assert_matches!(&reaction_info.status, ReactionStatus::RemoteToRemote(..)); diff --git a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs index f38826baa16..9bb28f4302a 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/timeline.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/timeline.rs @@ -182,7 +182,8 @@ async fn test_toggling_reaction() -> Result<()> { // Local echo is added. { let event = assert_event_is_updated!(timeline_updates[0], event_id, message_position); - let reactions = event.reactions().get(&reaction_key).unwrap(); + let reactions = event.content().reactions(); + let reactions = reactions.get(&reaction_key).unwrap(); let reaction = reactions.get(&user_id).unwrap(); assert_matches!(reaction.status, ReactionStatus::LocalToRemote(..)); } @@ -191,7 +192,8 @@ async fn test_toggling_reaction() -> Result<()> { { let event = assert_event_is_updated!(timeline_updates[1], event_id, message_position); - let reactions = event.reactions().get(&reaction_key).unwrap(); + let reactions = event.content().reactions(); + let reactions = reactions.get(&reaction_key).unwrap(); assert_eq!(reactions.keys().count(), 1); let reaction = reactions.get(&user_id).unwrap(); @@ -217,7 +219,7 @@ async fn test_toggling_reaction() -> Result<()> { // The reaction is removed. let event = assert_event_is_updated!(timeline_updates[0], event_id, message_position); - assert!(event.reactions().is_empty()); + assert!(event.content().reactions().is_empty()); assert_pending!(stream); }