From 1b3ddadf23add29286a671ddd8378b3fdc07f301 Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Thu, 25 Jan 2024 23:15:06 -0800 Subject: [PATCH] Fix AtLeastOnce delivery for now; Make is unordered. --- src/sync-agent/deliveries.ts | 69 ++++++++----- src/sync-agent/delivery-alo.test.ts | 148 +++++++++++++++++----------- src/utils/async-trigger.ts | 35 +++++++ 3 files changed, 171 insertions(+), 81 deletions(-) create mode 100644 src/utils/async-trigger.ts diff --git a/src/sync-agent/deliveries.ts b/src/sync-agent/deliveries.ts index 9db068a..95a1afd 100644 --- a/src/sync-agent/deliveries.ts +++ b/src/sync-agent/deliveries.ts @@ -41,7 +41,8 @@ export abstract class SyncDelivery implements AsyncDisposable { protected _onUpdate?: UpdateEvent; private _startPromiseResolve?: () => void; protected _onReset?: () => void; - protected abortController: AbortController; + protected _abortController: AbortController; + protected _lastTillNow: SvStateVector; // TODO: Use options to configure parameters constructor( @@ -61,7 +62,8 @@ export abstract class SyncDelivery implements AsyncDisposable { resolve(); } }); - this.abortController = new AbortController(); + this._abortController = new AbortController(); + this._lastTillNow = new SvStateVector(this.state); SvSync.create({ endpoint: endpoint, @@ -130,7 +132,7 @@ export abstract class SyncDelivery implements AsyncDisposable { // Note: the abstract class does not know where is the storage to store SVS vector. // Derived classes will override this with `destroy()` this._ready = false; - this.abortController.abort('SvsDelivery Destroyed'); + this._abortController.abort('SvsDelivery Destroyed'); if (this._syncInst !== undefined) { this._syncInst.close(); if (storage !== undefined) { @@ -150,8 +152,9 @@ export abstract class SyncDelivery implements AsyncDisposable { throw new Error('Please do not reset before start.'); } console.warn('A Sync reset is scheduled.'); - this.abortController.abort('Reset'); - this.abortController = new AbortController(); + this._abortController.abort('Reset'); + this._abortController = new AbortController(); + this._lastTillNow = new SvStateVector(this.state); this._syncInst.close(); this._syncNode = undefined; const svSync = await SvSync.create({ @@ -212,7 +215,7 @@ export abstract class SyncDelivery implements AsyncDisposable { } // At least once delivery (closer to exactly once). Used for Y.Doc updates and large blob. -// This class does not handle segmentation. +// This class does not handle segmentation and reordering. // Note: storage is not necessarily a real storage. export class AtLeastOnceDelivery extends SyncDelivery { constructor( @@ -229,9 +232,10 @@ export class AtLeastOnceDelivery extends SyncDelivery { } override async handleSyncUpdate(update: SyncUpdate) { - // TODO: URGENT: No guarantee this function is single entry. Separate delivery thread with the fetcher. + // Note: No guarantee this function is single entry. Need to separate delivery thread with the fetcher. + // Updated: since it is hard to do so, I did a quick fix by dropping the ordering requirement. + const prefix = getNamespace().baseName(update.id, this.syncPrefix); - let lastHandled = update.loSeqNum - 1; // Modify NDNts's segmented object fetching pipeline to fetch sequences. // fetchSegments is not supposed to be working with sequence numbers, but I can abuse the convention const continuation = fetchSegments(prefix, { @@ -245,15 +249,12 @@ export class AtLeastOnceDelivery extends SyncDelivery { // WARN: an abort controller is required! NDNts's fetcher cannot close itself even after // the face is destroyed and there exists no way to send the Interest. // deno-lint-ignore no-explicit-any - signal: this.abortController.signal as any, + signal: this._abortController.signal as any, }); + let lastHandled: number | undefined; try { - for await (const data of continuation) { - const i = data.name.get(data.name.length - 1)?.as(SequenceNum); - if (i !== lastHandled + 1) { - throw new Error(`[FATAL] sync update error: seq=${i} is processed before seq=${lastHandled + 1}`); - } - + for await (const data of continuation.unordered()) { + lastHandled = data.name.get(data.name.length - 1)?.as(SequenceNum); // Put into storage // Note: even though endpoint.consume does not give me the raw Data packet, // the encode result will be the same. @@ -263,9 +264,6 @@ export class AtLeastOnceDelivery extends SyncDelivery { // AtLeastOnce is required to have the callback acknowledged // before writing the new SvStateVector into the storage await this._onUpdate!(data.content, update.id, this); - - // Mark as persist - lastHandled = i; } } catch (error) { // If it is due to destroy, silently shutdown. @@ -274,7 +272,7 @@ export class AtLeastOnceDelivery extends SyncDelivery { } // TODO: Find a better way to handle this - console.error(`Unable to fetch or verify ${prefix}:${lastHandled + 1} due to: `, error); + console.error(`Unable to fetch or verify ${prefix}:${lastHandled} due to: `, error); console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered'); this._syncInst?.close(); @@ -289,11 +287,34 @@ export class AtLeastOnceDelivery extends SyncDelivery { return; } - // Putting this out of the loop makes it not exactly once: - // If the application is down before all messages in the update is handled, - // some may be redelivered the next time the application starts. - // Sinc Yjs allows an update to be applied multiple times, this should be fine. - await this.setSyncState(update.id, lastHandled, this.storage); + // // Putting this out of the loop makes it not exactly once: + // // If the application is down before all messages in the update is handled, + // // some may be redelivered the next time the application starts. + // // Sinc Yjs allows an update to be applied multiple times, this should be fine. + // await this.setSyncState(update.id, lastHandled, this.storage); + + // Updated: the original design contravened the AtLeastOnce guarantee due to gap and multi-entry of this function. + let lastSeen = this._lastTillNow.get(update.id); + if (lastSeen < update.hiSeqNum) { + this._lastTillNow.set(update.id, update.hiSeqNum); + lastSeen = update.hiSeqNum; + } + const front = this.syncState.get(update.id); + // We don't know if update is next to front. If there is a gap, we must not do anything. + if (update.loSeqNum > front + 1) { + return; + } + // Otherwise, we move to hiSeqNum first, and check if there is anything further. + let newSeq = update.hiSeqNum; + const updateBaseName = getNamespace().baseName(update.id, this.syncPrefix); + for (; newSeq < lastSeen; newSeq++) { + // This can be optimized with some data structure like C++ set, but not now. + const dataName = updateBaseName.append(SequenceNum.create(newSeq + 1)); + if (!this.storage.has(dataName.toString())) { + break; + } + } + await this.setSyncState(update.id, newSeq, this.storage); } override async produce(content: Uint8Array) { diff --git a/src/sync-agent/delivery-alo.test.ts b/src/sync-agent/delivery-alo.test.ts index 3915f87..3814de1 100644 --- a/src/sync-agent/delivery-alo.test.ts +++ b/src/sync-agent/delivery-alo.test.ts @@ -3,7 +3,7 @@ import { Forwarder } from '@ndn/fw'; import { Data, digestSigning, Name } from '@ndn/packet'; import { GenericNumber } from '@ndn/naming-convention2'; import { Encoder } from '@ndn/tlv'; -import { assert } from '../dep.ts'; +import { assert, hex } from '../dep.ts'; import { AtLeastOnceDelivery, SyncDelivery } from './deliveries.ts'; import { AsyncDisposableStack, name, Responder } from '../utils/mod.ts'; import { InMemoryStorage } from '../storage/mod.ts'; @@ -98,6 +98,20 @@ class DeliveryTester implements AsyncDisposable { } } +const eventToKey = (a: SyncUpdateEvent) => `${a.receiver}-${a.origin}-${hex.encodeHex(a.content)}`; + +const compareEvent = (a: SyncUpdateEvent, b: SyncUpdateEvent) => { + const keyA = eventToKey(a); + const keyB = eventToKey(b); + if (keyA < keyB) { + return -1; + } else if (keyA > keyB) { + return 1; + } else { + return 0; + } +}; + Deno.test('basic test', async () => { let eventSet; { @@ -110,76 +124,96 @@ Deno.test('basic test', async () => { }); await tester.start(2000); - await tester.alos[1].produce(new TextEncoder().encode('Hello')); - await tester.alos[1].produce(new TextEncoder().encode('World')); + await tester.alos[1].produce(new TextEncoder().encode('0-Hello')); + await tester.alos[1].produce(new TextEncoder().encode('1-World')); await new Promise((resolve) => setTimeout(resolve, 100)); await stopSignal; eventSet = tester.events; } + // Since it is unordered, we have to sort + eventSet.sort(compareEvent); assert.assertEquals(eventSet.length, 2); assert.assertEquals(eventSet[0], { - content: new TextEncoder().encode('Hello'), + content: new TextEncoder().encode('0-Hello'), + origin: 1, + receiver: 0, + }); + assert.assertEquals(eventSet[1], { + content: new TextEncoder().encode('1-World'), + origin: 1, + receiver: 0, + }); +}); + +Deno.test('no missing due to parallel', async () => { + let eventSet; + { + const { promise: stopSignal1, resolve: stop1 } = Promise.withResolvers(); + const { promise: stopSignal2, resolve: stop2 } = Promise.withResolvers(); + await using tester = new DeliveryTester(2, () => { + if (tester.events.length === 2) { + stop1(); + } else if (tester.events.length === 4) { + stop2(); + } + return Promise.resolve(); + }); + await tester.start(3000); + + // Make a hanging trigger without actual data + tester.alos[1].syncNode!.seqNum = 2; + await new Promise((resolve) => setTimeout(resolve, 100)); + // Then generate two normal updates. They are fetched first. + await tester.alos[1].produce(new TextEncoder().encode('C')); + await tester.alos[1].produce(new TextEncoder().encode('D')); + + await stopSignal1; + // For now, the state must not be set + assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 0); + // But the data should be delivered + assert.assertEquals(tester.events.length, 2); + + // Finally make up those missing data. + await tester.dispositData(1, 1, new TextEncoder().encode('A')); + await tester.dispositData(1, 2, new TextEncoder().encode('B')); + // Wait for retransmission Interest. + await new Promise((resolve) => setTimeout(resolve, 1500)); + + await stopSignal2; + eventSet = tester.events; + + // At last, the state should be updated + assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 4); + } + + // Since it is unordered, we have to sort + eventSet.sort(compareEvent); + assert.assertEquals(eventSet.length, 4); + assert.assertEquals(eventSet[0], { + content: new TextEncoder().encode('A'), origin: 1, receiver: 0, }); assert.assertEquals(eventSet[1], { - content: new TextEncoder().encode('World'), + content: new TextEncoder().encode('B'), + origin: 1, + receiver: 0, + }); + assert.assertEquals(eventSet[2], { + content: new TextEncoder().encode('C'), + origin: 1, + receiver: 0, + }); + assert.assertEquals(eventSet[3], { + content: new TextEncoder().encode('D'), origin: 1, receiver: 0, }); }); -// TODO: Known failure -// Deno.test('no missing due to parallel', async () => { -// let eventSet; -// { -// const { promise: stopSignal, resolve: stop } = Promise.withResolvers(); -// await using tester = new DeliveryTester(2, () => { -// if (tester.events.length === 2) { -// stop(); -// } -// return Promise.resolve(); -// }); -// await tester.start(3000); - -// // Make a hanging trigger without actual data -// tester.alos[1].syncNode!.seqNum = 2; -// await new Promise((resolve) => setTimeout(resolve, 100)); -// // Then generate two normal updates. They are fetched first. -// await tester.alos[1].produce(new TextEncoder().encode('C')); -// await tester.alos[1].produce(new TextEncoder().encode('D')); -// await new Promise((resolve) => setTimeout(resolve, 100)); -// // Finally make up those missing data. -// await tester.dispositData(1, 1, new TextEncoder().encode('A')); -// await tester.dispositData(1, 2, new TextEncoder().encode('B')); -// // Wait for retransmission Interest. -// await new Promise((resolve) => setTimeout(resolve, 1500)); - -// await stopSignal; -// eventSet = tester.events; -// } - -// assert.assertEquals(eventSet.length, 4); -// assert.assertEquals(eventSet[0], { -// content: new TextEncoder().encode('A'), -// origin: 1, -// receiver: 0, -// }); -// assert.assertEquals(eventSet[1], { -// content: new TextEncoder().encode('B'), -// origin: 1, -// receiver: 0, -// }); -// assert.assertEquals(eventSet[2], { -// content: new TextEncoder().encode('C'), -// origin: 1, -// receiver: 0, -// }); -// assert.assertEquals(eventSet[3], { -// content: new TextEncoder().encode('D'), -// origin: 1, -// receiver: 0, -// }); -// }); +// Test recover after shutdown and replay +// Test crash during onUpdate +// Test unverified state interest +// Test unverified data diff --git a/src/utils/async-trigger.ts b/src/utils/async-trigger.ts new file mode 100644 index 0000000..127db4c --- /dev/null +++ b/src/utils/async-trigger.ts @@ -0,0 +1,35 @@ +export class AsyncTrigger implements Disposable { + #promise: Promise; + #resolve: (value: boolean) => void; + #abortListener; + + constructor( + readonly abortSignal: AbortSignal, + ) { + ({ promise: this.#promise, resolve: this.#resolve } = Promise.withResolvers()); + this.#abortListener = () => { + this.#resolve(false); + }; + abortSignal.addEventListener('abort', this.#abortListener, { once: true }); + } + + [Symbol.dispose](): void { + this.abortSignal.removeEventListener('abort', this.#abortListener); + this.#resolve(false); + } + + async *[Symbol.asyncIterator]() { + while (!this.abortSignal.aborted) { + const value = await this.#promise; + if (this.abortSignal.aborted || !value) { + return; + } + ({ promise: this.#promise, resolve: this.#resolve } = Promise.withResolvers()); + yield value; + } + } + + trigger() { + this.#resolve(true); + } +}