Skip to content

Commit

Permalink
Let RTO be smaller; Let bundler reset timer on continuous typing
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Apr 8, 2024
1 parent 330c34c commit 20df33c
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 7 deletions.
39 changes: 39 additions & 0 deletions src/adaptors/bundler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,42 @@ Deno.test('Bundler.3', async () => {
await bundler.produce(b`World.`);
assertEquals(result, [b`Hello World,`, b`Hello World.`]);
});

Deno.test('Bundler.4', async () => {
const result: Array<Uint8Array> = [];
const bundler = new Bundler(
joinMerger,
(update) => {
result.push(update);
return Promise.resolve();
},
{
thresholdSize: 1000,
delayMs: 300,
maxDelayMs: 800,
},
);

await bundler.produce(b`A`);
await bundler.produce(b`B`);
await sleep(0.4); // Trigger break
await bundler.produce(b`C`);
await bundler.produce(b`D`);
await sleep(0.25); // No break
await bundler.produce(b`E`);
await bundler.produce(b`F`);
await sleep(0.25); // No break
await bundler.produce(b`G`);
await bundler.produce(b`H`);
await sleep(0.25); // No break
await bundler.produce(b`I`);
await bundler.produce(b`J`);
await sleep(0.25); // With break
await bundler.produce(b`K`);
await bundler.produce(b`L`);
await sleep(0.1); // No break
await bundler.produce(b`M`);
await bundler.produce(b`N`);
await sleep(0.4); // With break
assertEquals(result, [b`AB`, b`CDEFGHIJ`, b`KLMN`]);
});
17 changes: 16 additions & 1 deletion src/adaptors/bundler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
export type BundlerOpts = {
thresholdSize?: number;
delayMs?: number;
maxDelayMs?: number;
};

export class Bundler {
protected _bundle: Array<Uint8Array> = [];
protected _timerId: number | undefined;
protected _startTime = 0;
protected _cumulativeSize = 0;
protected _opts: Required<BundlerOpts>;

Expand All @@ -15,11 +17,13 @@ export class Bundler {
opts: {
thresholdSize?: number;
delayMs?: number;
maxDelayMs?: number;
} = {},
) {
this._opts = {
thresholdSize: 3000,
delayMs: 200,
delayMs: 400, // combine as typing
maxDelayMs: 1600,
...opts,
};
}
Expand All @@ -33,6 +37,16 @@ export class Bundler {
} else if (!this._timerId) {
// Schedule emit
this._timerId = setTimeout(() => this.issue(), this._opts.delayMs);
this._startTime = Date.now();
} else {
// Delay for longer when there is input
const timePast = Date.now() - this._startTime;
const maxDelayRemaining = this._opts.maxDelayMs - timePast;
if (maxDelayRemaining > 10) {
const nextDelay = Math.min(this._opts.delayMs, maxDelayRemaining);
clearTimeout(this._timerId);
this._timerId = setTimeout(() => this.issue(), nextDelay);
}
}
}

Expand All @@ -41,6 +55,7 @@ export class Bundler {
clearTimeout(this._timerId);
}
this._timerId = undefined;
this._startTime = Date.now();
const output = this._bundle.length === 1 ? this._bundle[0] : this.merge(this._bundle);
this._bundle = [];
this._cumulativeSize = 0;
Expand Down
3 changes: 2 additions & 1 deletion src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export class NdnSvsAdaptor {
(content) => this.syncAgent.publishUpdate(this.topic, content),
{
thresholdSize: 3000,
delayMs: 200,
delayMs: 400,
maxDelayMs: 1600,
},
);
}
Expand Down
11 changes: 6 additions & 5 deletions src/sync-agent/deliveries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,13 @@ export class AtLeastOnceDelivery extends SyncDelivery {
const continuation = fetchSegments(prefix, {
segmentNumConvention: SequenceNum,
segmentRange: [update.loSeqNum, update.hiSeqNum + 1],
retxLimit: 40,
lifetimeAfterRto: 1000, // The true timeout timer is the RTO
retxLimit: 80,
lifetimeAfterRto: 1000, // Lifetime = RTO + 1000
// The true timeout timer is the RTO, specified below
rtte: {
initRto: 1000,
minRto: 1000, // Minimal RTO is 1000
maxRto: 120000,
initRto: 50,
minRto: 50, // Minimal RTO is 50ms
maxRto: 2000,
},
ca: new LimitedCwnd(new TcpCubic(), 10),
verifier: this.verifier,
Expand Down
1 change: 1 addition & 0 deletions src/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ export class SyncAgent implements AsyncDisposable {
verifier: this.verifier,
modifyInterest: { mustBeFresh: true },
lifetimeAfterRto: 2000,
retxLimit: 25,
});
for await (const segment of result) {
// Cache packets
Expand Down

0 comments on commit 20df33c

Please sign in to comment.