Skip to content

Commit

Permalink
feat(zero-cache): pusher service
Browse files Browse the repository at this point in the history
Service that sits between zero-client and the user's API server. Receives pushes and forwards them, in order, to the user's API server. Does batching in the case where a client gets ahead of the API server.
  • Loading branch information
tantaman committed Feb 7, 2025
1 parent 42b9229 commit c903682
Show file tree
Hide file tree
Showing 2 changed files with 497 additions and 0 deletions.
272 changes: 272 additions & 0 deletions packages/zero-cache/src/services/mutagen/pusher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import {describe, expect, test, vi} from 'vitest';
import {combinePushes, PusherService, Queue} from './pusher.ts';
import type {Mutation, PushBody} from '../../../../zero-protocol/src/push.ts';
import {createSilentLogContext} from '../../../../shared/src/logging-test-utils.ts';
import {resolver} from '@rocicorp/resolver';

describe('queue', () => {
test('a consumer blocks until tasks are available', async () => {
const queue = new Queue<number>();
const promise = queue.awaitTasks().then(() => {
const tasks = queue.drain();
expect(tasks).toEqual([1]);
});
queue.enqueue(1);
await promise;
});

test('drain will get all tasks that were accumulated in the prior tick', async () => {
const queue = new Queue<number>();
const promise = queue.awaitTasks().then(() => {
const tasks = queue.drain();
expect(tasks).toEqual([1, 2, 3]);
});
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
await promise;
});

test('a consumer is called if tasks are already available', async () => {
const queue = new Queue<number>();
queue.enqueue(1);
const promise = queue.awaitTasks().then(() => {
const tasks = queue.drain();
expect(tasks).toEqual([1]);
});
await promise;
});

test('drain will get all tasks that were accumulated in the prior tick | 2', async () => {
const queue = new Queue<number>();
queue.enqueue(1);
const promise = queue.awaitTasks().then(() => {
const tasks = queue.drain();
expect(tasks).toEqual([1, 2, 3]);
});
queue.enqueue(2);
queue.enqueue(3);
await promise;
});
});

describe('combine pushes', () => {
test('empty array', () => {
const [pushes, terminate] = combinePushes([]);
expect(pushes).toEqual([]);
expect(terminate).toBe(false);
});

test('same JWT for all pushes', () => {
const [pushes, terminate] = combinePushes([
{
push: makePush(1),
jwt: 'a',
},
{
push: makePush(1),
jwt: 'a',
},
{
push: makePush(1),
jwt: 'a',
},
]);
expect(pushes).toHaveLength(1);
expect(terminate).toBe(false);
expect(pushes[0].push.mutations).toHaveLength(3);
});

test('different JWT groups', () => {
const [pushes, terminate] = combinePushes([
{
push: makePush(1),
jwt: 'a',
},
{
push: makePush(1),
jwt: 'a',
},
{
push: makePush(1),
jwt: 'c',
},
{
push: makePush(1),
jwt: 'b',
},
{
push: makePush(1),
jwt: 'b',
},
{
push: makePush(1),
jwt: 'c',
},
]);
expect(pushes).toHaveLength(4);
expect(terminate).toBe(false);
expect(pushes[0].push.mutations).toHaveLength(2);
expect(pushes[0].jwt).toBe('a');
expect(pushes[1].push.mutations).toHaveLength(1);
expect(pushes[1].jwt).toBe('c');
expect(pushes[2].push.mutations).toHaveLength(2);
expect(pushes[2].jwt).toBe('b');
expect(pushes[3].push.mutations).toHaveLength(1);
expect(pushes[3].jwt).toBe('c');
});

test('stop', () => {
const [pushes, terminate] = combinePushes(['stop']);
expect(pushes).toEqual([]);
expect(terminate).toBe(true);
});

test('stop after pushes', () => {
const [pushes, terminate] = combinePushes([
{
push: makePush(1),
jwt: 'a',
},
{
push: makePush(1),
jwt: 'a',
},
'stop',
]);
expect(pushes).toHaveLength(1);
expect(terminate).toBe(true);
});

test('stop in the middle', () => {
const [pushes, terminate] = combinePushes([
{
push: makePush(1),
jwt: 'a',
},
'stop',
{
push: makePush(1),
jwt: 'a',
},
]);
expect(pushes).toHaveLength(1);
expect(terminate).toBe(true);
});
});

const lc = createSilentLogContext();
describe('pusher service', () => {
test('the service can be stopped', async () => {
const pusher = new PusherService(
lc,
'cgid',
'http://exmaple.com',
undefined,
);
let shutDown = false;
void pusher.run().then(() => {
shutDown = true;
});
await pusher.stop();
expect(shutDown).toBe(true);
});

test('the service sets authorization headers', async () => {
const fetch = (global.fetch = vi.fn());
fetch.mockResolvedValue({
ok: true,
});

const pusher = new PusherService(
lc,
'cgid',
'http://exmaple.com',
'api-key',
);
void pusher.run();

pusher.enqueuePush(makePush(1), 'jwt');

await pusher.stop();

expect(fetch.mock.calls[0][1]?.headers).toEqual({
'Content-Type': 'application/json',
'X-Api-Key': 'api-key',
// eslint-disable-next-line @typescript-eslint/naming-convention
'Authorization': 'Bearer jwt',
});

fetch.mockReset();
});

test('the service correctly batches pushes when the API server is delayed', async () => {
const fetch = (global.fetch = vi.fn());
const apiServerReturn = resolver();
fetch.mockImplementation(async (_url: string, _options: RequestInit) => {
await apiServerReturn.promise;
});

const pusher = new PusherService(
lc,
'cgid',
'http://exmaple.com',
'api-key',
);

void pusher.run();
pusher.enqueuePush(makePush(1), 'jwt');
// release control of the loop so the push can be sent
await Promise.resolve();

// We should have sent the first push
expect(fetch.mock.calls).toHaveLength(1);
expect(JSON.parse(fetch.mock.calls[0][1].body).mutations).toHaveLength(1);

// We have not resolved the API server yet so these should stack up
pusher.enqueuePush(makePush(1), 'jwt');
await Promise.resolve();
pusher.enqueuePush(makePush(1), 'jwt');
await Promise.resolve();
pusher.enqueuePush(makePush(1), 'jwt');
await Promise.resolve();

// no new pushes sent yet since we are still waiting on the user's API server
expect(fetch.mock.calls).toHaveLength(1);

// let the API server go
apiServerReturn.resolve();
// wait for the pusher to finish
await new Promise(resolve => {
setTimeout(resolve, 0);
});

// We sent all the pushes in one batch
expect(JSON.parse(fetch.mock.calls[1][1].body).mutations).toHaveLength(3);
expect(fetch.mock.calls).toHaveLength(2);
});
});

let timestamp = 0;
let id = 0;
function makePush(numMutations: number): PushBody {
return {
clientGroupID: 'cgid',
mutations: Array.from({length: numMutations}, makeMutation),
pushVersion: 1,
requestID: 'rid',
schemaVersion: 1,
timestamp: ++timestamp,
};
}

function makeMutation(): Mutation {
return {
type: 'custom',
args: [],
clientID: 'cid',
id: ++id,
name: 'n',
timestamp: ++timestamp,
} as const;
}
Loading

0 comments on commit c903682

Please sign in to comment.