Skip to content

Commit

Permalink
Add bundler for Yjs adaptor
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Feb 9, 2024
1 parent 5593f4a commit d0ec3f3
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 2 deletions.
2 changes: 2 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "1.1.0",
"version": "1.1.1-rc1",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test --no-check",
Expand Down
76 changes: 76 additions & 0 deletions src/adaptors/bundler.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { assert as assertMod, sleep } from '../dep.ts';
import { Bundler } from './bundler.ts';

const { assertEquals } = assertMod;

export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value);

const joinMerger = (updates: Uint8Array[]) => {
const totLength = updates.reduce((acc, cur) => acc + cur.length, 0);
const ret = new Uint8Array(totLength);
updates.reduce((offset, cur) => {
ret.set(cur, offset);
return offset + cur.length;
}, 0);
return ret;
};

Deno.test('Bundler.1', async () => {
const result: Array<Uint8Array> = [];
const bundler = new Bundler(
joinMerger,
(update) => {
result.push(update);
return Promise.resolve();
},
{
thresholdSize: 100,
delayMs: 100,
},
);

await bundler.produce(b`Hello `);
await bundler.produce(b`World.`);
await sleep(0.1);
assertEquals(result, [b`Hello World.`]);
});

Deno.test('Bundler.2', async () => {
const result: Array<Uint8Array> = [];
const bundler = new Bundler(
joinMerger,
(update) => {
result.push(update);
return Promise.resolve();
},
{
thresholdSize: 3,
delayMs: 100,
},
);

await bundler.produce(b`Hello `);
await bundler.produce(b`World.`);
assertEquals(result, [b`Hello `, b`World.`]);
});

Deno.test('Bundler.3', async () => {
const result: Array<Uint8Array> = [];
const bundler = new Bundler(
joinMerger,
(update) => {
result.push(update);
return Promise.resolve();
},
{
thresholdSize: 10,
delayMs: 100,
},
);

await bundler.produce(b`Hello `);
await bundler.produce(b`World,`);
await bundler.produce(b`Hello `);
await bundler.produce(b`World.`);
assertEquals(result, [b`Hello World,`, b`Hello World.`]);
});
49 changes: 49 additions & 0 deletions src/adaptors/bundler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
export type BundlerOpts = {
thresholdSize?: number;
delayMs?: number;
};

export class Bundler {
protected _bundle: Array<Uint8Array> = [];
protected _timerId: number | undefined;
protected _cumulativeSize = 0;
protected _opts: Required<BundlerOpts>;

constructor(
public readonly merge: (updates: Array<Uint8Array>) => Uint8Array,
public readonly emit: (update: Uint8Array) => Promise<void>,
opts: {
thresholdSize?: number;
delayMs?: number;
} = {},
) {
this._opts = {
thresholdSize: 3000,
delayMs: 200,
...opts,
};
}

public async produce(update: Uint8Array) {
this._bundle.push(update);
this._cumulativeSize += update.byteLength;
if (this._cumulativeSize > this._opts.thresholdSize) {
// Emit immediately
await this.issue();
} else if (!this._timerId) {
// Schedule emit
this._timerId = setTimeout(() => this.issue(), this._opts.delayMs);
}
}

public async issue() {
if (this._timerId) {
clearTimeout(this._timerId);
}
this._timerId = undefined;
const output = this._bundle.length === 1 ? this._bundle[0] : this.merge(this._bundle);
this._bundle = [];
this._cumulativeSize = 0;
await this.emit(output);
}
}
20 changes: 19 additions & 1 deletion src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { SyncAgent } from '../sync-agent/mod.ts';
import * as Y from 'yjs';
import { Awareness } from 'y-protocols/awareness.js';
import { Bundler } from './bundler.ts';

/**
* NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel.
Expand All @@ -23,13 +24,26 @@ export class NdnSvsAdaptor {
return this.#awareness;
}

#bundler: Bundler | undefined;

constructor(
public syncAgent: SyncAgent,
public readonly doc: Y.Doc,
public readonly topic: string,
useBundler: boolean = false,
) {
syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content));
doc.on('update', this.callback);
if (useBundler) {
this.#bundler = new Bundler(
Y.mergeUpdatesV2,
(content) => this.syncAgent.publishUpdate(this.topic, content),
{
thresholdSize: 3000,
delayMs: 200,
},
);
}
}

public bindAwareness(subDoc: Y.Doc, docId: string) {
Expand Down Expand Up @@ -78,7 +92,11 @@ export class NdnSvsAdaptor {
}

private async produce(content: Uint8Array) {
await this.syncAgent.publishUpdate(this.topic, content);
if (this.#bundler) {
await this.#bundler.produce(content);
} else {
await this.syncAgent.publishUpdate(this.topic, content);
}
}

public handleSyncUpdate(content: Uint8Array) {
Expand Down
1 change: 1 addition & 0 deletions src/dep.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
*/
export * as assert from 'https://deno.land/[email protected]/assert/mod.ts';
export * as hex from 'https://deno.land/[email protected]/encoding/hex.ts';
export { sleep } from 'https://deno.land/x/[email protected]/mod.ts';
2 changes: 2 additions & 0 deletions src/workspace/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export class Workspace implements AsyncDisposable {
verifier: Verifier;
onReset?: () => void;
createNewDoc?: () => Promise<void>;
useBundler?: boolean;
}) {
// Always init a new one, and then load.
if (opts.createNewDoc) {
Expand All @@ -50,6 +51,7 @@ export class Workspace implements AsyncDisposable {
syncAgent,
opts.rootDoc,
'doc',
opts.useBundler ?? false,
);
const yjsSnapshotMgr = new YjsStateManager(
() => encodeSyncState(syncAgent!.getUpdateSyncSV()),
Expand Down

0 comments on commit d0ec3f3

Please sign in to comment.