From 6de7f6784d88eacef4e27e3d0924514a4d32201a Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Fri, 9 Feb 2024 11:18:10 -0800 Subject: [PATCH] Add bundler for Yjs adaptor --- deno.lock | 2 + package.json | 2 +- src/adaptors/bundler.test.ts | 76 +++++++++++++++++++++++++++++++++ src/adaptors/bundler.ts | 49 +++++++++++++++++++++ src/adaptors/yjs-ndn-adaptor.ts | 20 ++++++++- src/dep.ts | 1 + src/workspace/workspace.ts | 2 + 7 files changed, 150 insertions(+), 2 deletions(-) create mode 100644 src/adaptors/bundler.test.ts create mode 100644 src/adaptors/bundler.ts diff --git a/deno.lock b/deno.lock index 1cb0ec2..0e5c9a7 100644 --- a/deno.lock +++ b/deno.lock @@ -181,6 +181,8 @@ "https://deno.land/x/dnt@0.39.0/lib/utils.ts": "224f15f33e7226a2fd991e438d0291d7ed8c7889807efa2e1ecb67d2d1db6720", "https://deno.land/x/dnt@0.39.0/mod.ts": "9df36a862161d9eb376472b699f6cb08ba0ad1704e0826fbe13be766bd3c01da", "https://deno.land/x/dnt@0.39.0/transform.ts": "f68743a14cf9bf53bfc9c81073871d69d447a7f9e3453e0447ca2fb78926bb1d", + "https://deno.land/x/sleep@v1.3.0/mod.ts": "e9955ecd3228a000e29d46726cd6ab14b65cf83904e9b365f3a8d64ec61c1af3", + "https://deno.land/x/sleep@v1.3.0/sleep.ts": "b6abaca093b094b0c2bba94f287b19a60946a8d15764d168f83fcf555f5bb59e", "https://deno.land/x/ts_morph@20.0.0/bootstrap/mod.ts": "b53aad517f106c4079971fcd4a81ab79fadc40b50061a3ab2b741a09119d51e9", "https://deno.land/x/ts_morph@20.0.0/bootstrap/ts_morph_bootstrap.js": "6645ac03c5e6687dfa8c78109dc5df0250b811ecb3aea2d97c504c35e8401c06", "https://deno.land/x/ts_morph@20.0.0/common/DenoRuntime.ts": "6a7180f0c6e90dcf23ccffc86aa8271c20b1c4f34c570588d08a45880b7e172d", diff --git a/package.json b/package.json index be5be27..d60e52c 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ucla-irl/ndnts-aux", - "version": "1.1.0", + "version": "1.1.1-rc2", "description": "NDNts Auxiliary Package for Web and Deno", "scripts": { "test": "deno test --no-check", diff --git a/src/adaptors/bundler.test.ts b/src/adaptors/bundler.test.ts new file mode 100644 index 0000000..9a9d001 --- /dev/null +++ b/src/adaptors/bundler.test.ts @@ -0,0 +1,76 @@ +import { assert as assertMod, sleep } from '../dep.ts'; +import { Bundler } from './bundler.ts'; + +const { assertEquals } = assertMod; + +export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); + +const joinMerger = (updates: Uint8Array[]) => { + const totLength = updates.reduce((acc, cur) => acc + cur.length, 0); + const ret = new Uint8Array(totLength); + updates.reduce((offset, cur) => { + ret.set(cur, offset); + return offset + cur.length; + }, 0); + return ret; +}; + +Deno.test('Bundler.1', async () => { + const result: Array = []; + const bundler = new Bundler( + joinMerger, + (update) => { + result.push(update); + return Promise.resolve(); + }, + { + thresholdSize: 100, + delayMs: 100, + }, + ); + + await bundler.produce(b`Hello `); + await bundler.produce(b`World.`); + await sleep(0.1); + assertEquals(result, [b`Hello World.`]); +}); + +Deno.test('Bundler.2', async () => { + const result: Array = []; + const bundler = new Bundler( + joinMerger, + (update) => { + result.push(update); + return Promise.resolve(); + }, + { + thresholdSize: 3, + delayMs: 100, + }, + ); + + await bundler.produce(b`Hello `); + await bundler.produce(b`World.`); + assertEquals(result, [b`Hello `, b`World.`]); +}); + +Deno.test('Bundler.3', async () => { + const result: Array = []; + const bundler = new Bundler( + joinMerger, + (update) => { + result.push(update); + return Promise.resolve(); + }, + { + thresholdSize: 10, + delayMs: 100, + }, + ); + + await bundler.produce(b`Hello `); + await bundler.produce(b`World,`); + await bundler.produce(b`Hello `); + await bundler.produce(b`World.`); + assertEquals(result, [b`Hello World,`, b`Hello World.`]); +}); diff --git a/src/adaptors/bundler.ts b/src/adaptors/bundler.ts new file mode 100644 index 0000000..9348cef --- /dev/null +++ b/src/adaptors/bundler.ts @@ -0,0 +1,49 @@ +export type BundlerOpts = { + thresholdSize?: number; + delayMs?: number; +}; + +export class Bundler { + protected _bundle: Array = []; + protected _timerId: number | undefined; + protected _cumulativeSize = 0; + protected _opts: Required; + + constructor( + public readonly merge: (updates: Array) => Uint8Array, + public readonly emit: (update: Uint8Array) => Promise, + opts: { + thresholdSize?: number; + delayMs?: number; + } = {}, + ) { + this._opts = { + thresholdSize: 3000, + delayMs: 200, + ...opts, + }; + } + + public async produce(update: Uint8Array) { + this._bundle.push(update); + this._cumulativeSize += update.byteLength; + if (this._cumulativeSize > this._opts.thresholdSize) { + // Emit immediately + await this.issue(); + } else if (!this._timerId) { + // Schedule emit + this._timerId = setTimeout(() => this.issue(), this._opts.delayMs); + } + } + + public async issue() { + if (this._timerId) { + clearTimeout(this._timerId); + } + this._timerId = undefined; + const output = this._bundle.length === 1 ? this._bundle[0] : this.merge(this._bundle); + this._bundle = []; + this._cumulativeSize = 0; + await this.emit(output); + } +} diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index d9d2dfb..90c6123 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -1,6 +1,7 @@ import { SyncAgent } from '../sync-agent/mod.ts'; import * as Y from 'yjs'; import { Awareness } from 'y-protocols/awareness.js'; +import { Bundler } from './bundler.ts'; /** * NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel. @@ -23,13 +24,26 @@ export class NdnSvsAdaptor { return this.#awareness; } + #bundler: Bundler | undefined; + constructor( public syncAgent: SyncAgent, public readonly doc: Y.Doc, public readonly topic: string, + useBundler: boolean = false, ) { syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content)); doc.on('update', this.callback); + if (useBundler) { + this.#bundler = new Bundler( + Y.mergeUpdates, + (content) => this.syncAgent.publishUpdate(this.topic, content), + { + thresholdSize: 3000, + delayMs: 200, + }, + ); + } } public bindAwareness(subDoc: Y.Doc, docId: string) { @@ -78,7 +92,11 @@ export class NdnSvsAdaptor { } private async produce(content: Uint8Array) { - await this.syncAgent.publishUpdate(this.topic, content); + if (this.#bundler) { + await this.#bundler.produce(content); + } else { + await this.syncAgent.publishUpdate(this.topic, content); + } } public handleSyncUpdate(content: Uint8Array) { diff --git a/src/dep.ts b/src/dep.ts index f50db33..788c61b 100644 --- a/src/dep.ts +++ b/src/dep.ts @@ -5,3 +5,4 @@ */ export * as assert from 'https://deno.land/std@0.212.0/assert/mod.ts'; export * as hex from 'https://deno.land/std@0.212.0/encoding/hex.ts'; +export { sleep } from 'https://deno.land/x/sleep@v1.3.0/mod.ts'; diff --git a/src/workspace/workspace.ts b/src/workspace/workspace.ts index 09f2d92..afafb61 100644 --- a/src/workspace/workspace.ts +++ b/src/workspace/workspace.ts @@ -26,6 +26,7 @@ export class Workspace implements AsyncDisposable { verifier: Verifier; onReset?: () => void; createNewDoc?: () => Promise; + useBundler?: boolean; }) { // Always init a new one, and then load. if (opts.createNewDoc) { @@ -50,6 +51,7 @@ export class Workspace implements AsyncDisposable { syncAgent, opts.rootDoc, 'doc', + opts.useBundler ?? false, ); const yjsSnapshotMgr = new YjsStateManager( () => encodeSyncState(syncAgent!.getUpdateSyncSV()),