From 20df33cc3b313477477424ae88577a853c3be9bf Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Sun, 7 Apr 2024 23:56:23 -0700 Subject: [PATCH] Let RTO be smaller; Let bundler reset timer on continuous typing --- src/adaptors/bundler.test.ts | 39 +++++++++++++++++++++++++++++++++ src/adaptors/bundler.ts | 17 +++++++++++++- src/adaptors/yjs-ndn-adaptor.ts | 3 ++- src/sync-agent/deliveries.ts | 11 +++++----- src/sync-agent/sync-agent.ts | 1 + 5 files changed, 64 insertions(+), 7 deletions(-) diff --git a/src/adaptors/bundler.test.ts b/src/adaptors/bundler.test.ts index 9a9d001..f7ca025 100644 --- a/src/adaptors/bundler.test.ts +++ b/src/adaptors/bundler.test.ts @@ -74,3 +74,42 @@ Deno.test('Bundler.3', async () => { await bundler.produce(b`World.`); assertEquals(result, [b`Hello World,`, b`Hello World.`]); }); + +Deno.test('Bundler.4', async () => { + const result: Array = []; + const bundler = new Bundler( + joinMerger, + (update) => { + result.push(update); + return Promise.resolve(); + }, + { + thresholdSize: 1000, + delayMs: 300, + maxDelayMs: 800, + }, + ); + + await bundler.produce(b`A`); + await bundler.produce(b`B`); + await sleep(0.4); // Trigger break + await bundler.produce(b`C`); + await bundler.produce(b`D`); + await sleep(0.25); // No break + await bundler.produce(b`E`); + await bundler.produce(b`F`); + await sleep(0.25); // No break + await bundler.produce(b`G`); + await bundler.produce(b`H`); + await sleep(0.25); // No break + await bundler.produce(b`I`); + await bundler.produce(b`J`); + await sleep(0.25); // With break + await bundler.produce(b`K`); + await bundler.produce(b`L`); + await sleep(0.1); // No break + await bundler.produce(b`M`); + await bundler.produce(b`N`); + await sleep(0.4); // With break + assertEquals(result, [b`AB`, b`CDEFGHIJ`, b`KLMN`]); +}); diff --git a/src/adaptors/bundler.ts b/src/adaptors/bundler.ts index 9348cef..e31af38 100644 --- a/src/adaptors/bundler.ts +++ b/src/adaptors/bundler.ts @@ -1,11 +1,13 @@ export type BundlerOpts = { thresholdSize?: number; delayMs?: number; + maxDelayMs?: number; }; export class Bundler { protected _bundle: Array = []; protected _timerId: number | undefined; + protected _startTime = 0; protected _cumulativeSize = 0; protected _opts: Required; @@ -15,11 +17,13 @@ export class Bundler { opts: { thresholdSize?: number; delayMs?: number; + maxDelayMs?: number; } = {}, ) { this._opts = { thresholdSize: 3000, - delayMs: 200, + delayMs: 400, // combine as typing + maxDelayMs: 1600, ...opts, }; } @@ -33,6 +37,16 @@ export class Bundler { } else if (!this._timerId) { // Schedule emit this._timerId = setTimeout(() => this.issue(), this._opts.delayMs); + this._startTime = Date.now(); + } else { + // Delay for longer when there is input + const timePast = Date.now() - this._startTime; + const maxDelayRemaining = this._opts.maxDelayMs - timePast; + if (maxDelayRemaining > 10) { + const nextDelay = Math.min(this._opts.delayMs, maxDelayRemaining); + clearTimeout(this._timerId); + this._timerId = setTimeout(() => this.issue(), nextDelay); + } } } @@ -41,6 +55,7 @@ export class Bundler { clearTimeout(this._timerId); } this._timerId = undefined; + this._startTime = Date.now(); const output = this._bundle.length === 1 ? this._bundle[0] : this.merge(this._bundle); this._bundle = []; this._cumulativeSize = 0; diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 90c6123..bb35d5d 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -40,7 +40,8 @@ export class NdnSvsAdaptor { (content) => this.syncAgent.publishUpdate(this.topic, content), { thresholdSize: 3000, - delayMs: 200, + delayMs: 400, + maxDelayMs: 1600, }, ); } diff --git a/src/sync-agent/deliveries.ts b/src/sync-agent/deliveries.ts index e1011b7..e97cd83 100644 --- a/src/sync-agent/deliveries.ts +++ b/src/sync-agent/deliveries.ts @@ -243,12 +243,13 @@ export class AtLeastOnceDelivery extends SyncDelivery { const continuation = fetchSegments(prefix, { segmentNumConvention: SequenceNum, segmentRange: [update.loSeqNum, update.hiSeqNum + 1], - retxLimit: 40, - lifetimeAfterRto: 1000, // The true timeout timer is the RTO + retxLimit: 80, + lifetimeAfterRto: 1000, // Lifetime = RTO + 1000 + // The true timeout timer is the RTO, specified below rtte: { - initRto: 1000, - minRto: 1000, // Minimal RTO is 1000 - maxRto: 120000, + initRto: 50, + minRto: 50, // Minimal RTO is 50ms + maxRto: 2000, }, ca: new LimitedCwnd(new TcpCubic(), 10), verifier: this.verifier, diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index b04bd49..ffde552 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -201,6 +201,7 @@ export class SyncAgent implements AsyncDisposable { verifier: this.verifier, modifyInterest: { mustBeFresh: true }, lifetimeAfterRto: 2000, + retxLimit: 25, }); for await (const segment of result) { // Cache packets