Skip to content

Commit 0780b64

Browse files
authored
chore(replicache): Add onClientsDeleted callback (#3666)
Replicache has 3 different gc processes: - Collect IDB (also applies to mem if you keep your browser open long enough) - When this happens we look at all the clients in the store before deleting it and call the callback. - Collect Client: - When a client gets collected we call the callback - Collect Client Group: - A client group is only collected if it no longer has any clients in it so there is nothing to do in this case. This callback will be used in Zero to tell the server about the deleted clients.
1 parent d20ee0b commit 0780b64

File tree

9 files changed

+362
-204
lines changed

9 files changed

+362
-204
lines changed

packages/replicache/src/persist/client-gc.test.ts

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {LogContext} from '@rocicorp/logger';
22
import {type SinonFakeTimers, useFakeTimers} from 'sinon';
3-
import {afterEach, beforeEach, expect, test} from 'vitest';
3+
import {afterEach, beforeEach, expect, test, vi} from 'vitest';
44
import {assertNotUndefined} from '../../../shared/src/asserts.ts';
55
import type {Read} from '../dag/store.ts';
66
import {TestStore} from '../dag/test-store.ts';
@@ -13,7 +13,12 @@ import {
1313
initClientGC,
1414
} from './client-gc.ts';
1515
import {makeClientV4, setClientsForTesting} from './clients-test-helpers.ts';
16-
import {type ClientMap, getClients, setClient} from './clients.ts';
16+
import {
17+
type ClientMap,
18+
getClients,
19+
type OnClientsDeleted,
20+
setClient,
21+
} from './clients.ts';
1722

1823
let clock: SinonFakeTimers;
1924
const START_TIME = 0;
@@ -66,11 +71,13 @@ test('initClientGC starts 5 min interval that collects clients that have been in
6671
await setClientsForTesting(clientMap, dagStore);
6772

6873
const controller = new AbortController();
74+
const onClientsDeleted = vi.fn<OnClientsDeleted>();
6975
initClientGC(
7076
'client1',
7177
dagStore,
7278
CLIENT_MAX_INACTIVE_TIME,
7379
GC_INTERVAL,
80+
onClientsDeleted,
7481
new LogContext(),
7582
controller.signal,
7683
);
@@ -96,6 +103,9 @@ test('initClientGC starts 5 min interval that collects clients that have been in
96103
client4,
97104
});
98105
});
106+
expect(onClientsDeleted).toHaveBeenCalledTimes(1);
107+
expect(onClientsDeleted).toHaveBeenCalledWith(['client2']);
108+
onClientsDeleted.mockClear();
99109

100110
// Update client4's heartbeat to now
101111
const client4WUpdatedHeartbeat = {
@@ -120,6 +130,9 @@ test('initClientGC starts 5 min interval that collects clients that have been in
120130
client4: client4WUpdatedHeartbeat,
121131
});
122132
});
133+
expect(onClientsDeleted).toHaveBeenCalledTimes(1);
134+
expect(onClientsDeleted).toHaveBeenCalledWith(['client3']);
135+
onClientsDeleted.mockClear();
123136

124137
clock.tick(24 * HOURS - 5 * MINUTES * 2 + 1);
125138
await clock.tickAsync(5 * MINUTES);
@@ -133,4 +146,6 @@ test('initClientGC starts 5 min interval that collects clients that have been in
133146
client1,
134147
});
135148
});
149+
expect(onClientsDeleted).toHaveBeenCalledTimes(1);
150+
expect(onClientsDeleted).toHaveBeenCalledWith(['client4']);
136151
});

packages/replicache/src/persist/client-gc.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {initBgIntervalProcess} from '../bg-interval.ts';
33
import type {Store} from '../dag/store.ts';
44
import type {ClientID} from '../sync/ids.ts';
55
import {withWrite} from '../with-transactions.ts';
6+
import type {Client, OnClientsDeleted} from './clients.ts';
67
import {type ClientMap, getClients, setClients} from './clients.ts';
78

89
/**
@@ -27,13 +28,19 @@ export function initClientGC(
2728
dagStore: Store,
2829
clientMaxInactiveTime: number,
2930
gcInterval: number,
31+
onClientsDeleted: OnClientsDeleted,
3032
lc: LogContext,
3133
signal: AbortSignal,
3234
): void {
3335
initBgIntervalProcess(
3436
'ClientGC',
3537
() => {
36-
latestGCUpdate = gcClients(clientID, dagStore, clientMaxInactiveTime);
38+
latestGCUpdate = gcClients(
39+
clientID,
40+
dagStore,
41+
clientMaxInactiveTime,
42+
onClientsDeleted,
43+
);
3744
return latestGCUpdate;
3845
},
3946
() => gcInterval,
@@ -46,20 +53,29 @@ function gcClients(
4653
clientID: ClientID,
4754
dagStore: Store,
4855
clientMaxInactiveTime: number,
56+
onClientsDeleted: OnClientsDeleted,
4957
): Promise<ClientMap> {
5058
return withWrite(dagStore, async dagWrite => {
5159
const now = Date.now();
5260
const clients = await getClients(dagWrite);
53-
const clientsAfterGC = Array.from(clients).filter(
54-
([id, client]) =>
61+
const deletedClients: ClientID[] = [];
62+
const newClients: Map<ClientID, Client> = new Map();
63+
for (const [id, client] of clients) {
64+
if (
5565
id === clientID /* never collect ourself */ ||
56-
now - client.heartbeatTimestampMs <= clientMaxInactiveTime,
57-
);
58-
if (clientsAfterGC.length === clients.size) {
66+
now - client.heartbeatTimestampMs <= clientMaxInactiveTime
67+
) {
68+
newClients.set(id, client);
69+
} else {
70+
deletedClients.push(id);
71+
}
72+
}
73+
74+
if (newClients.size === clients.size) {
5975
return clients;
6076
}
61-
const newClients = new Map(clientsAfterGC);
6277
await setClients(newClients, dagWrite);
78+
onClientsDeleted(deletedClients);
6379
return newClients;
6480
});
6581
}

packages/replicache/src/persist/client-group-gc.test.ts

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {assertNotUndefined} from '../../../shared/src/asserts.ts';
55
import type {Read} from '../dag/store.ts';
66
import {TestStore} from '../dag/test-store.ts';
77
import {fakeHash} from '../hash.ts';
8-
import {withRead, withWriteNoImplicitCommit} from '../with-transactions.ts';
8+
import {withRead, withWrite} from '../with-transactions.ts';
99
import {getLatestGCUpdate, initClientGroupGC} from './client-group-gc.ts';
1010
import {
1111
type ClientGroup,
@@ -69,21 +69,17 @@ test('initClientGroupGC starts 5 min interval that collects client groups that a
6969
lastServerAckdMutationIDs: {},
7070
disabled: false,
7171
};
72-
const clientGroupMap = await withWriteNoImplicitCommit(
73-
dagStore,
74-
async write => {
75-
const clientGroupMap = new Map(
76-
Object.entries({
77-
'client-group-1': clientGroup1,
78-
'client-group-2': clientGroup2,
79-
'client-group-3': clientGroup3,
80-
}),
81-
);
82-
await setClientGroups(clientGroupMap, write);
83-
await write.commit();
84-
return clientGroupMap;
85-
},
86-
);
72+
const clientGroupMap = await withWrite(dagStore, async write => {
73+
const clientGroupMap = new Map(
74+
Object.entries({
75+
'client-group-1': clientGroup1,
76+
'client-group-2': clientGroup2,
77+
'client-group-3': clientGroup3,
78+
}),
79+
);
80+
await setClientGroups(clientGroupMap, write);
81+
return clientGroupMap;
82+
});
8783
const client1 = makeClientV6({
8884
heartbeatTimestampMs: START_TIME,
8985
refreshHashes: [fakeHash('eadce1')],
@@ -161,10 +157,8 @@ test('initClientGroupGC starts 5 min interval that collects client groups that a
161157
...clientGroup1,
162158
lastServerAckdMutationIDs: clientGroup1.mutationIDs,
163159
};
164-
await withWriteNoImplicitCommit(dagStore, async write => {
160+
await withWrite(dagStore, async write => {
165161
await setClientGroup('client-group-1', updatedClientGroup1, write);
166-
await write.commit();
167-
return clientGroupMap;
168162
});
169163

170164
// nothing collected yet because gc has not run yet

packages/replicache/src/persist/clients.test.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import {deepFreeze} from '../frozen-json.ts';
1818
import {assertHash, fakeHash, newRandomHash} from '../hash.ts';
1919
import type {IndexDefinitions} from '../index-defs.ts';
2020
import type {ClientGroupID, ClientID} from '../sync/ids.ts';
21-
import {withRead, withWriteNoImplicitCommit} from '../with-transactions.ts';
21+
import {withRead, withWrite} from '../with-transactions.ts';
2222
import {
2323
type ClientGroup,
2424
getClientGroup,
@@ -295,9 +295,8 @@ test('getClient', async () => {
295295

296296
test('updateClients throws errors if clients head exist but the chunk it references does not', async () => {
297297
const dagStore = new TestStore();
298-
await withWriteNoImplicitCommit(dagStore, async (write: Write) => {
298+
await withWrite(dagStore, async (write: Write) => {
299299
await write.setHead('clients', randomStuffHash);
300-
await write.commit();
301300
});
302301
await withRead(dagStore, async (read: Read) => {
303302
let e;
@@ -312,7 +311,7 @@ test('updateClients throws errors if clients head exist but the chunk it referen
312311

313312
test('updateClients throws errors if chunk pointed to by clients head does not contain a valid ClientMap', async () => {
314313
const dagStore = new TestStore();
315-
await withWriteNoImplicitCommit(dagStore, async (write: Write) => {
314+
await withWrite(dagStore, async (write: Write) => {
316315
const headHash = headClient1Hash;
317316
const chunk = write.createChunk(
318317
deepFreeze({
@@ -326,7 +325,6 @@ test('updateClients throws errors if chunk pointed to by clients head does not c
326325
write.putChunk(chunk),
327326
write.setHead('clients', chunk.hash),
328327
]);
329-
await write.commit();
330328
});
331329
await withRead(dagStore, async (read: Read) => {
332330
let e;
@@ -393,9 +391,8 @@ test('setClient', async () => {
393391
const dagStore = new TestStore();
394392

395393
const t = async (clientID: ClientID, client: ClientV5) => {
396-
await withWriteNoImplicitCommit(dagStore, async (write: Write) => {
394+
await withWrite(dagStore, async (write: Write) => {
397395
await setClient(clientID, client, write);
398-
await write.commit();
399396
});
400397

401398
await withRead(dagStore, async (read: Read) => {
@@ -439,10 +436,9 @@ test('getClientGroupID', async () => {
439436
expectedClientGroupID: ClientGroupID | undefined,
440437
expectedClientGroup: ClientGroup | undefined,
441438
) => {
442-
await withWriteNoImplicitCommit(dagStore, async write => {
439+
await withWrite(dagStore, async write => {
443440
await setClient(clientID, client, write);
444441
await setClientGroup(clientGroupID, clientGroup, write);
445-
await write.commit();
446442
});
447443

448444
const actualClientGroupID = await withRead(dagStore, read =>
@@ -545,7 +541,7 @@ describe('findMatchingClient', () => {
545541
await b.addGenesis(clientID);
546542
await b.addLocal(clientID, []);
547543

548-
await withWriteNoImplicitCommit(perdag, async write => {
544+
await withWrite(perdag, async write => {
549545
const client: ClientV5 = {
550546
clientGroupID,
551547
headHash: b.chain[1].chunk.hash,
@@ -563,8 +559,6 @@ describe('findMatchingClient', () => {
563559
disabled: initialDisabled,
564560
};
565561
await setClientGroup(clientGroupID, clientGroup, write);
566-
567-
await write.commit();
568562
});
569563

570564
await withRead(perdag, async read => {
@@ -646,9 +640,8 @@ describe('findMatchingClient', () => {
646640
mutatorNames: initialMutatorNames,
647641
disabled: false,
648642
};
649-
await withWriteNoImplicitCommit(perdag, async write => {
643+
await withWrite(perdag, async write => {
650644
await setClientGroup(clientGroupID, clientGroup, write);
651-
await write.commit();
652645
});
653646

654647
await chainBuilder.removeHead();
@@ -736,10 +729,9 @@ describe('initClientV6', () => {
736729
disabled: false,
737730
};
738731

739-
await withWriteNoImplicitCommit(perdag, async write => {
732+
await withWrite(perdag, async write => {
740733
await setClient(clientID1, client1, write);
741734
await setClientGroup(clientGroupID, clientGroup1, write);
742-
await write.commit();
743735
});
744736

745737
const clientID2 = makeClientID();
@@ -820,10 +812,9 @@ describe('initClientV6', () => {
820812
disabled: false,
821813
};
822814

823-
await withWriteNoImplicitCommit(perdag, async write => {
815+
await withWrite(perdag, async write => {
824816
await setClient(clientID1, client1, write);
825817
await setClientGroup(clientGroupID1, clientGroup1, write);
826-
await write.commit();
827818
});
828819

829820
const clientID2 = makeClientID();
@@ -938,10 +929,9 @@ describe('initClientV6', () => {
938929
disabled: false,
939930
};
940931

941-
await withWriteNoImplicitCommit(perdag, async write => {
932+
await withWrite(perdag, async write => {
942933
await setClient(clientID1, client1, write);
943934
await setClientGroup(clientGroupID1, clientGroup1, write);
944-
await write.commit();
945935
});
946936

947937
const clientID2 = makeClientID();

packages/replicache/src/persist/clients.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,3 +604,8 @@ export async function setClients(
604604
await dagWrite.setHead(CLIENTS_HEAD_NAME, chunk.hash);
605605
return chunk.hash;
606606
}
607+
608+
/**
609+
* Callback function for when Replicache has deleted one or more clients.
610+
*/
611+
export type OnClientsDeleted = (clientIDs: ClientID[]) => void;

0 commit comments

Comments
 (0)