Skip to content

Commit

Permalink
Fix AtLeastOnce delivery for now; Make is unordered.
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Jan 26, 2024
1 parent 8a83f21 commit 1b3ddad
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 81 deletions.
69 changes: 45 additions & 24 deletions src/sync-agent/deliveries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ export abstract class SyncDelivery implements AsyncDisposable {
protected _onUpdate?: UpdateEvent;
private _startPromiseResolve?: () => void;
protected _onReset?: () => void;
protected abortController: AbortController;
protected _abortController: AbortController;
protected _lastTillNow: SvStateVector;

// TODO: Use options to configure parameters
constructor(
Expand All @@ -61,7 +62,8 @@ export abstract class SyncDelivery implements AsyncDisposable {
resolve();
}
});
this.abortController = new AbortController();
this._abortController = new AbortController();
this._lastTillNow = new SvStateVector(this.state);

SvSync.create({
endpoint: endpoint,
Expand Down Expand Up @@ -130,7 +132,7 @@ export abstract class SyncDelivery implements AsyncDisposable {
// Note: the abstract class does not know where is the storage to store SVS vector.
// Derived classes will override this with `destroy()`
this._ready = false;
this.abortController.abort('SvsDelivery Destroyed');
this._abortController.abort('SvsDelivery Destroyed');
if (this._syncInst !== undefined) {
this._syncInst.close();
if (storage !== undefined) {
Expand All @@ -150,8 +152,9 @@ export abstract class SyncDelivery implements AsyncDisposable {
throw new Error('Please do not reset before start.');
}
console.warn('A Sync reset is scheduled.');
this.abortController.abort('Reset');
this.abortController = new AbortController();
this._abortController.abort('Reset');
this._abortController = new AbortController();
this._lastTillNow = new SvStateVector(this.state);
this._syncInst.close();
this._syncNode = undefined;
const svSync = await SvSync.create({
Expand Down Expand Up @@ -212,7 +215,7 @@ export abstract class SyncDelivery implements AsyncDisposable {
}

// At least once delivery (closer to exactly once). Used for Y.Doc updates and large blob.
// This class does not handle segmentation.
// This class does not handle segmentation and reordering.
// Note: storage is not necessarily a real storage.
export class AtLeastOnceDelivery extends SyncDelivery {
constructor(
Expand All @@ -229,9 +232,10 @@ export class AtLeastOnceDelivery extends SyncDelivery {
}

override async handleSyncUpdate(update: SyncUpdate<Name>) {
// TODO: URGENT: No guarantee this function is single entry. Separate delivery thread with the fetcher.
// Note: No guarantee this function is single entry. Need to separate delivery thread with the fetcher.
// Updated: since it is hard to do so, I did a quick fix by dropping the ordering requirement.

const prefix = getNamespace().baseName(update.id, this.syncPrefix);
let lastHandled = update.loSeqNum - 1;
// Modify NDNts's segmented object fetching pipeline to fetch sequences.
// fetchSegments is not supposed to be working with sequence numbers, but I can abuse the convention
const continuation = fetchSegments(prefix, {
Expand All @@ -245,15 +249,12 @@ export class AtLeastOnceDelivery extends SyncDelivery {
// WARN: an abort controller is required! NDNts's fetcher cannot close itself even after
// the face is destroyed and there exists no way to send the Interest.
// deno-lint-ignore no-explicit-any
signal: this.abortController.signal as any,
signal: this._abortController.signal as any,
});
let lastHandled: number | undefined;
try {
for await (const data of continuation) {
const i = data.name.get(data.name.length - 1)?.as(SequenceNum);
if (i !== lastHandled + 1) {
throw new Error(`[FATAL] sync update error: seq=${i} is processed before seq=${lastHandled + 1}`);
}

for await (const data of continuation.unordered()) {
lastHandled = data.name.get(data.name.length - 1)?.as(SequenceNum);
// Put into storage
// Note: even though endpoint.consume does not give me the raw Data packet,
// the encode result will be the same.
Expand All @@ -263,9 +264,6 @@ export class AtLeastOnceDelivery extends SyncDelivery {
// AtLeastOnce is required to have the callback acknowledged
// before writing the new SvStateVector into the storage
await this._onUpdate!(data.content, update.id, this);

// Mark as persist
lastHandled = i;
}
} catch (error) {
// If it is due to destroy, silently shutdown.
Expand All @@ -274,7 +272,7 @@ export class AtLeastOnceDelivery extends SyncDelivery {
}

// TODO: Find a better way to handle this
console.error(`Unable to fetch or verify ${prefix}:${lastHandled + 1} due to: `, error);
console.error(`Unable to fetch or verify ${prefix}:${lastHandled} due to: `, error);
console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered');

this._syncInst?.close();
Expand All @@ -289,11 +287,34 @@ export class AtLeastOnceDelivery extends SyncDelivery {
return;
}

// Putting this out of the loop makes it not exactly once:
// If the application is down before all messages in the update is handled,
// some may be redelivered the next time the application starts.
// Sinc Yjs allows an update to be applied multiple times, this should be fine.
await this.setSyncState(update.id, lastHandled, this.storage);
// // Putting this out of the loop makes it not exactly once:
// // If the application is down before all messages in the update is handled,
// // some may be redelivered the next time the application starts.
// // Sinc Yjs allows an update to be applied multiple times, this should be fine.
// await this.setSyncState(update.id, lastHandled, this.storage);

// Updated: the original design contravened the AtLeastOnce guarantee due to gap and multi-entry of this function.
let lastSeen = this._lastTillNow.get(update.id);
if (lastSeen < update.hiSeqNum) {
this._lastTillNow.set(update.id, update.hiSeqNum);
lastSeen = update.hiSeqNum;
}
const front = this.syncState.get(update.id);
// We don't know if update is next to front. If there is a gap, we must not do anything.
if (update.loSeqNum > front + 1) {
return;
}
// Otherwise, we move to hiSeqNum first, and check if there is anything further.
let newSeq = update.hiSeqNum;
const updateBaseName = getNamespace().baseName(update.id, this.syncPrefix);
for (; newSeq < lastSeen; newSeq++) {
// This can be optimized with some data structure like C++ set, but not now.
const dataName = updateBaseName.append(SequenceNum.create(newSeq + 1));
if (!this.storage.has(dataName.toString())) {
break;
}
}
await this.setSyncState(update.id, newSeq, this.storage);
}

override async produce(content: Uint8Array) {
Expand Down
148 changes: 91 additions & 57 deletions src/sync-agent/delivery-alo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Forwarder } from '@ndn/fw';
import { Data, digestSigning, Name } from '@ndn/packet';
import { GenericNumber } from '@ndn/naming-convention2';
import { Encoder } from '@ndn/tlv';
import { assert } from '../dep.ts';
import { assert, hex } from '../dep.ts';
import { AtLeastOnceDelivery, SyncDelivery } from './deliveries.ts';
import { AsyncDisposableStack, name, Responder } from '../utils/mod.ts';
import { InMemoryStorage } from '../storage/mod.ts';
Expand Down Expand Up @@ -98,6 +98,20 @@ class DeliveryTester implements AsyncDisposable {
}
}

const eventToKey = (a: SyncUpdateEvent) => `${a.receiver}-${a.origin}-${hex.encodeHex(a.content)}`;

const compareEvent = (a: SyncUpdateEvent, b: SyncUpdateEvent) => {
const keyA = eventToKey(a);
const keyB = eventToKey(b);
if (keyA < keyB) {
return -1;
} else if (keyA > keyB) {
return 1;
} else {
return 0;
}
};

Deno.test('basic test', async () => {
let eventSet;
{
Expand All @@ -110,76 +124,96 @@ Deno.test('basic test', async () => {
});
await tester.start(2000);

await tester.alos[1].produce(new TextEncoder().encode('Hello'));
await tester.alos[1].produce(new TextEncoder().encode('World'));
await tester.alos[1].produce(new TextEncoder().encode('0-Hello'));
await tester.alos[1].produce(new TextEncoder().encode('1-World'));
await new Promise((resolve) => setTimeout(resolve, 100));

await stopSignal;
eventSet = tester.events;
}

// Since it is unordered, we have to sort
eventSet.sort(compareEvent);
assert.assertEquals(eventSet.length, 2);
assert.assertEquals(eventSet[0], {
content: new TextEncoder().encode('Hello'),
content: new TextEncoder().encode('0-Hello'),
origin: 1,
receiver: 0,
});
assert.assertEquals(eventSet[1], {
content: new TextEncoder().encode('1-World'),
origin: 1,
receiver: 0,
});
});

Deno.test('no missing due to parallel', async () => {
let eventSet;
{
const { promise: stopSignal1, resolve: stop1 } = Promise.withResolvers<void>();
const { promise: stopSignal2, resolve: stop2 } = Promise.withResolvers<void>();
await using tester = new DeliveryTester(2, () => {
if (tester.events.length === 2) {
stop1();
} else if (tester.events.length === 4) {
stop2();
}
return Promise.resolve();
});
await tester.start(3000);

// Make a hanging trigger without actual data
tester.alos[1].syncNode!.seqNum = 2;
await new Promise((resolve) => setTimeout(resolve, 100));
// Then generate two normal updates. They are fetched first.
await tester.alos[1].produce(new TextEncoder().encode('C'));
await tester.alos[1].produce(new TextEncoder().encode('D'));

await stopSignal1;
// For now, the state must not be set
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 0);
// But the data should be delivered
assert.assertEquals(tester.events.length, 2);

// Finally make up those missing data.
await tester.dispositData(1, 1, new TextEncoder().encode('A'));
await tester.dispositData(1, 2, new TextEncoder().encode('B'));
// Wait for retransmission Interest.
await new Promise((resolve) => setTimeout(resolve, 1500));

await stopSignal2;
eventSet = tester.events;

// At last, the state should be updated
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 4);
}

// Since it is unordered, we have to sort
eventSet.sort(compareEvent);
assert.assertEquals(eventSet.length, 4);
assert.assertEquals(eventSet[0], {
content: new TextEncoder().encode('A'),
origin: 1,
receiver: 0,
});
assert.assertEquals(eventSet[1], {
content: new TextEncoder().encode('World'),
content: new TextEncoder().encode('B'),
origin: 1,
receiver: 0,
});
assert.assertEquals(eventSet[2], {
content: new TextEncoder().encode('C'),
origin: 1,
receiver: 0,
});
assert.assertEquals(eventSet[3], {
content: new TextEncoder().encode('D'),
origin: 1,
receiver: 0,
});
});

// TODO: Known failure
// Deno.test('no missing due to parallel', async () => {
// let eventSet;
// {
// const { promise: stopSignal, resolve: stop } = Promise.withResolvers<void>();
// await using tester = new DeliveryTester(2, () => {
// if (tester.events.length === 2) {
// stop();
// }
// return Promise.resolve();
// });
// await tester.start(3000);

// // Make a hanging trigger without actual data
// tester.alos[1].syncNode!.seqNum = 2;
// await new Promise((resolve) => setTimeout(resolve, 100));
// // Then generate two normal updates. They are fetched first.
// await tester.alos[1].produce(new TextEncoder().encode('C'));
// await tester.alos[1].produce(new TextEncoder().encode('D'));
// await new Promise((resolve) => setTimeout(resolve, 100));
// // Finally make up those missing data.
// await tester.dispositData(1, 1, new TextEncoder().encode('A'));
// await tester.dispositData(1, 2, new TextEncoder().encode('B'));
// // Wait for retransmission Interest.
// await new Promise((resolve) => setTimeout(resolve, 1500));

// await stopSignal;
// eventSet = tester.events;
// }

// assert.assertEquals(eventSet.length, 4);
// assert.assertEquals(eventSet[0], {
// content: new TextEncoder().encode('A'),
// origin: 1,
// receiver: 0,
// });
// assert.assertEquals(eventSet[1], {
// content: new TextEncoder().encode('B'),
// origin: 1,
// receiver: 0,
// });
// assert.assertEquals(eventSet[2], {
// content: new TextEncoder().encode('C'),
// origin: 1,
// receiver: 0,
// });
// assert.assertEquals(eventSet[3], {
// content: new TextEncoder().encode('D'),
// origin: 1,
// receiver: 0,
// });
// });
// Test recover after shutdown and replay
// Test crash during onUpdate
// Test unverified state interest
// Test unverified data
35 changes: 35 additions & 0 deletions src/utils/async-trigger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
export class AsyncTrigger implements Disposable {
#promise: Promise<boolean>;
#resolve: (value: boolean) => void;
#abortListener;

constructor(
readonly abortSignal: AbortSignal,
) {
({ promise: this.#promise, resolve: this.#resolve } = Promise.withResolvers<boolean>());
this.#abortListener = () => {
this.#resolve(false);
};
abortSignal.addEventListener('abort', this.#abortListener, { once: true });
}

[Symbol.dispose](): void {
this.abortSignal.removeEventListener('abort', this.#abortListener);
this.#resolve(false);
}

async *[Symbol.asyncIterator]() {
while (!this.abortSignal.aborted) {
const value = await this.#promise;
if (this.abortSignal.aborted || !value) {
return;
}
({ promise: this.#promise, resolve: this.#resolve } = Promise.withResolvers<boolean>());
yield value;
}
}

trigger() {
this.#resolve(true);
}
}

0 comments on commit 1b3ddad

Please sign in to comment.