Skip to content

Commit ddbfc98

Browse files
committed
WIP, hack, blood and tears
1 parent 2344f00 commit ddbfc98

File tree

7 files changed

+134
-49
lines changed

7 files changed

+134
-49
lines changed

crates/matrix-sdk-base/src/event_cache/store/memory_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl EventCacheStore for MemoryStore {
8888
async fn reload_linked_chunk(
8989
&self,
9090
_room_id: &RoomId,
91-
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error> {
91+
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error> {
9292
// TODO(hywan)
9393
Ok(Default::default())
9494
}

crates/matrix-sdk-base/src/event_cache/store/traits.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub trait EventCacheStore: AsyncTraitDeps {
6363
async fn reload_linked_chunk(
6464
&self,
6565
room_id: &RoomId,
66-
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error>;
66+
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error>;
6767

6868
/// Add a media file's content in the media store.
6969
///
@@ -174,7 +174,7 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {
174174
async fn reload_linked_chunk(
175175
&self,
176176
room_id: &RoomId,
177-
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error> {
177+
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error> {
178178
self.0.reload_linked_chunk(room_id).await.map_err(Into::into)
179179
}
180180

crates/matrix-sdk-sqlite/migrations/event_cache_store/003_events.sql

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ CREATE TABLE "linked_chunks" (
22
-- Identifier of the chunk, unique per room.
33
"id" INTEGER,
44
-- Which room does this chunk belong to? (hashed key shared with the two other tables)
5-
"room_id" BLOB NO NULL,
5+
"room_id" BLOB NOT NULL,
66

77
-- Previous chunk in the linked list.
88
"previous" INTEGER,
@@ -21,10 +21,10 @@ CREATE TABLE "gaps" (
2121
-- Which chunk does this gap refer to?
2222
"chunk_id" INTEGER NOT NULL,
2323
-- Which room does this event belong to? (hashed key shared with linked_chunks)
24-
"room_id" BLOB NO NULL,
24+
"room_id" BLOB NOT NULL,
2525

2626
-- The previous batch token of a gap (encrypted value).
27-
"prev_token" TEXT NOT NULL,
27+
"prev_token" BLOB NOT NULL,
2828

2929
-- If the owning chunk gets deleted, delete the entry too.
3030
FOREIGN KEY(chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE
@@ -35,14 +35,14 @@ CREATE TABLE "events" (
3535
-- Which chunk does this event refer to?
3636
"chunk_id" INTEGER NOT NULL,
3737
-- Which room does this event belong to? (hashed key shared with linked_chunks)
38-
"room_id" BLOB NO NULL,
38+
"room_id" BLOB NOT NULL,
3939

4040
-- `OwnedEventId` for events, can be null if malformed.
4141
"event_id" TEXT,
4242
-- JSON serialized `Raw<AnySyncTimelineEvent>` (encrypted value).
4343
"raw" BLOB NOT NULL,
44-
-- Index (position) in the chunk.
45-
"index" INTEGER NOT NULL,
44+
-- Position (index) in the chunk.
45+
"position" INTEGER NOT NULL,
4646

4747
-- If the owning chunk gets deleted, delete the entry too.
4848
FOREIGN KEY(chunk_id, room_id) REFERENCES linked_chunks(id, room_id) ON DELETE CASCADE

crates/matrix-sdk-sqlite/src/event_cache_store.rs

Lines changed: 74 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use matrix_sdk_store_encryption::StoreCipher;
1515
use ruma::{MilliSecondsSinceUnixEpoch, RoomId};
1616
use rusqlite::{OptionalExtension, Transaction};
1717
use tokio::fs;
18-
use tracing::debug;
18+
use tracing::{debug, trace};
1919

2020
use crate::{
2121
error::{Error, Result},
@@ -201,20 +201,35 @@ impl EventCacheStore for SqliteEventCacheStore {
201201
room_id: &RoomId,
202202
updates: &[Update<Event, Gap>],
203203
) -> Result<(), Self::Error> {
204-
let room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
204+
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
205205

206206
for up in updates {
207207
match up {
208208
Update::NewItemsChunk { previous, new, next } => {
209209
let new = new.clone();
210210
let previous = previous.clone();
211211
let next = next.clone();
212-
let room_id = room_id.clone();
212+
let hashed_room_id = hashed_room_id.clone();
213+
214+
trace!(
215+
%room_id,
216+
"new events chunk (prev={:?}, i={}, next={:?})",
217+
previous.as_ref().map(ChunkIdentifier::index),
218+
new.index(),
219+
next.as_ref().map(ChunkIdentifier::index)
220+
);
213221

214222
self.acquire()
215223
.await?
216224
.with_transaction(move |txn| {
217-
insert_chunk(txn, &room_id, previous.as_ref(), new, next.as_ref(), "E")
225+
insert_chunk(
226+
txn,
227+
&hashed_room_id,
228+
previous.as_ref(),
229+
new,
230+
next.as_ref(),
231+
"E",
232+
)
218233
})
219234
.await?;
220235
}
@@ -223,17 +238,26 @@ impl EventCacheStore for SqliteEventCacheStore {
223238
let new = new.clone();
224239
let previous = previous.clone();
225240
let next = next.clone();
226-
let room_id = room_id.clone();
241+
let hashed_room_id = hashed_room_id.clone();
242+
243+
let serialized = serde_json::to_vec(&gap.prev_token)?;
244+
let prev_token = self.encode_value(serialized)?;
227245

228-
let prev_token = self.encode_value(gap.prev_token.clone().into_bytes())?;
246+
trace!(
247+
%room_id,
248+
"new gap chunk (prev={:?}, i={}, next={:?})",
249+
previous.as_ref().map(ChunkIdentifier::index),
250+
new.index(),
251+
next.as_ref().map(ChunkIdentifier::index)
252+
);
229253

230254
self.acquire()
231255
.await?
232256
.with_transaction(move |txn| -> rusqlite::Result<()> {
233257
// Insert the chunk as a gap.
234258
insert_chunk(
235259
txn,
236-
&room_id,
260+
&hashed_room_id,
237261
previous.as_ref(),
238262
new,
239263
next.as_ref(),
@@ -248,7 +272,7 @@ impl EventCacheStore for SqliteEventCacheStore {
248272
INSERT INTO gaps(chunk_id, room_id, prev_token)
249273
VALUES (?, ?, ?)
250274
"#,
251-
(new.index(), room_id, prev_token),
275+
(new.index(), hashed_room_id, prev_token),
252276
)?;
253277

254278
Ok(())
@@ -257,32 +281,34 @@ impl EventCacheStore for SqliteEventCacheStore {
257281
}
258282

259283
Update::RemoveChunk(chunk_identifier) => {
260-
let room_id = room_id.clone();
284+
let hashed_room_id = hashed_room_id.clone();
261285
let chunk_id = chunk_identifier.index();
262286

287+
trace!(%room_id, "removing chunk @ {chunk_id}");
288+
263289
self.acquire()
264290
.await?
265291
.with_transaction(move |txn| -> rusqlite::Result<()> {
266292
// Find chunk to delete.
267293
let (previous, next): (Option<usize>, Option<usize>) = txn.query_row(
268294
"SELECT previous, next FROM linked_chunks WHERE id = ? AND room_id = ?",
269-
(chunk_id, &room_id),
295+
(chunk_id, &hashed_room_id),
270296
|row| Ok((row.get(0)?, row.get(1)?))
271297
)?;
272298

273299
// Replace its previous' next to its own next.
274300
if let Some(previous) = previous {
275-
txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &room_id))?;
301+
txn.execute("UPDATE linked_chunks SET next = ? WHERE id = ? AND room_id = ?", (next, previous, &hashed_room_id))?;
276302
}
277303

278304
// Replace its next' previous to its own previous.
279305
if let Some(next) = next {
280-
txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &room_id))?;
306+
txn.execute("UPDATE linked_chunks SET previous = ? WHERE id = ? AND room_id = ?", (previous, next, &hashed_room_id))?;
281307
}
282308

283309
// Now delete it, and let cascading delete corresponding entries in the
284310
// other data tables.
285-
txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, room_id))?;
311+
txn.execute("DELETE FROM linked_chunks WHERE id = ? AND room_id = ?", (chunk_id, hashed_room_id))?;
286312

287313
Ok(())
288314
})
@@ -291,7 +317,9 @@ impl EventCacheStore for SqliteEventCacheStore {
291317

292318
Update::PushItems { at, items } => {
293319
let chunk_id = at.chunk_identifier().index();
294-
let room_id = room_id.clone();
320+
let hashed_room_id = hashed_room_id.clone();
321+
322+
trace!(%room_id, "pushing items @ {chunk_id}");
295323

296324
let entries = items
297325
.into_iter()
@@ -301,7 +329,7 @@ impl EventCacheStore for SqliteEventCacheStore {
301329
let raw = self.encode_value(serialized)?;
302330
let event_id = event.event_id().map(|event_id| event_id.to_string());
303331
let index = at.index() + i;
304-
Ok((chunk_id, event_id, raw, index))
332+
Ok((event_id, raw, index))
305333
})
306334
.collect::<Result<Vec<_>, _>>()?;
307335

@@ -311,10 +339,10 @@ impl EventCacheStore for SqliteEventCacheStore {
311339
for entry in entries {
312340
txn.execute(
313341
r#"
314-
INSERT INTO events(chunk_id, room_id, event_id, raw, index)
315-
VALUES (?, ?, ?, ?)
342+
INSERT INTO events(chunk_id, room_id, event_id, raw, position)
343+
VALUES (?, ?, ?, ?, ?)
316344
"#,
317-
(entry.0, &room_id, entry.1, entry.2, entry.3),
345+
(chunk_id, &hashed_room_id, entry.0, entry.1, entry.2),
318346
)?;
319347
}
320348

@@ -324,25 +352,27 @@ impl EventCacheStore for SqliteEventCacheStore {
324352
}
325353

326354
Update::RemoveItem { at } => {
327-
let room_id = room_id.clone();
355+
let hashed_room_id = hashed_room_id.clone();
328356
let chunk_id = at.chunk_identifier().index();
329357
let index = at.index();
330358

359+
trace!(%room_id, "removing item @ {chunk_id}:{index}");
360+
331361
self.acquire()
332362
.await?
333363
.with_transaction(move |txn| -> rusqlite::Result<()> {
334364
// Remove the entry.
335-
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND index = ?", (&room_id, chunk_id, index))?;
365+
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position = ?", (&hashed_room_id, chunk_id, index))?;
336366

337367
// Decrement the index of each item after the one we're going to
338368
// remove.
339369
txn.execute(
340370
r#"
341371
UPDATE events
342-
SET index = index - 1
343-
WHERE room_id = ? AND chunk_id = ? AND index > ?
372+
SET position = position - 1
373+
WHERE room_id = ? AND chunk_id = ? AND position > ?
344374
"#,
345-
(&room_id, chunk_id, index)
375+
(&hashed_room_id, chunk_id, index)
346376
)?;
347377

348378
Ok(())
@@ -351,15 +381,17 @@ impl EventCacheStore for SqliteEventCacheStore {
351381
}
352382

353383
Update::DetachLastItems { at } => {
354-
let room_id = room_id.clone();
384+
let hashed_room_id = hashed_room_id.clone();
355385
let chunk_id = at.chunk_identifier().index();
356386
let index = at.index();
357387

388+
trace!(%room_id, "truncating items >= {chunk_id}:{index}");
389+
358390
self.acquire()
359391
.await?
360392
.with_transaction(move |txn| -> rusqlite::Result<()> {
361393
// Remove these entries.
362-
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND index >= ?", (&room_id, chunk_id, index))?;
394+
txn.execute("DELETE FROM events WHERE room_id = ? AND chunk_id = ? AND position >= ?", (&hashed_room_id, chunk_id, index))?;
363395
Ok(())
364396
})
365397
.await?;
@@ -391,8 +423,11 @@ impl EventCacheStore for SqliteEventCacheStore {
391423
async fn reload_linked_chunk(
392424
&self,
393425
room_id: &RoomId,
394-
) -> Result<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>, Self::Error> {
395-
let room_id = self.encode_key(keys::LINKED_CHUNKS, room_id);
426+
) -> Result<Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>, Self::Error> {
427+
let room_id = room_id.to_owned();
428+
let hashed_room_id = self.encode_key(keys::LINKED_CHUNKS, &room_id);
429+
430+
let this = self.clone();
396431

397432
let result = self
398433
.acquire()
@@ -404,7 +439,7 @@ impl EventCacheStore for SqliteEventCacheStore {
404439
.prepare(
405440
"SELECT id, previous, next, type FROM linked_chunks WHERE room_id = ?",
406441
)?
407-
.query_map((&room_id,), |row| {
442+
.query_map((&hashed_room_id,), |row| {
408443
Ok((
409444
row.get::<_, u64>(0)?,
410445
row.get::<_, Option<u64>>(1)?,
@@ -415,15 +450,19 @@ impl EventCacheStore for SqliteEventCacheStore {
415450
{
416451
let (id, previous, next, chunk_type) = data?;
417452

453+
trace!(%room_id, "reloaded chunk {id} of type {chunk_type}");
454+
418455
match chunk_type.as_str() {
419456
"G" => {
420457
// It's a gap! There's at most one row for it in the database, so a
421458
// call to `query_row` is sufficient.
422-
let prev_token: String = txn.query_row(
459+
let encoded_prev_token: Vec<u8> = txn.query_row(
423460
"SELECT prev_token FROM gaps WHERE chunk_id = ? AND room_id = ?",
424-
(id, &room_id),
461+
(id, &hashed_room_id),
425462
|row| row.get(0),
426463
)?;
464+
let prev_token_bytes = this.decode_value(&encoded_prev_token)?;
465+
let prev_token = serde_json::from_slice(&prev_token_bytes)?;
427466

428467
let previous = previous.map(ChunkIdentifier::from_raw);
429468
let next = next.map(ChunkIdentifier::from_raw);
@@ -444,12 +483,13 @@ impl EventCacheStore for SqliteEventCacheStore {
444483
r#"
445484
SELECT raw FROM events
446485
WHERE chunk_id = ? AND room_id = ?
447-
ORDER BY index ASC
486+
ORDER BY position ASC
448487
"#,
449488
)?
450-
.query_map((id, &room_id), |row| row.get::<_, Vec<u8>>(0))?
489+
.query_map((id, &hashed_room_id), |row| row.get::<_, Vec<u8>>(0))?
451490
{
452-
let raw = event_data?;
491+
let encoded_raw = event_data?;
492+
let raw = this.decode_value(&encoded_raw)?;
453493
let raw_event = serde_json::from_slice(&raw)?;
454494

455495
// TODO: keep encryption information around!
@@ -473,7 +513,7 @@ impl EventCacheStore for SqliteEventCacheStore {
473513

474514
builder.set_observable();
475515

476-
Ok(builder.build().unwrap_or_default())
516+
Ok(builder.build())
477517
})
478518
.await?;
479519

crates/matrix-sdk/src/event_cache/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use tokio::sync::{
5151
broadcast::{error::RecvError, Receiver},
5252
Mutex, RwLock,
5353
};
54-
use tracing::{error, info_span, instrument, trace, warn, Instrument as _, Span};
54+
use tracing::{debug, error, info_span, instrument, trace, warn, Instrument as _, Span};
5555

5656
use self::paginator::PaginatorError;
5757
use crate::{client::WeakClient, Client};
@@ -228,6 +228,7 @@ impl EventCache {
228228

229229
async move {
230230
while ignore_user_list_stream.next().await.is_some() {
231+
debug!("received a ignore user list change");
231232
if let Err(err) = inner.clear_all_rooms().await {
232233
warn!("error when clearing all room after an ignore list update: {err}");
233234
}

0 commit comments

Comments
 (0)