Skip to content

Commit

Permalink
NDNts-aux snapshot (#6)
Browse files Browse the repository at this point in the history
Initial Snapshot code.
  • Loading branch information
CAWorks-ChrisA authored Oct 8, 2024
1 parent 3064dac commit edcff4c
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 4 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "3.0.5",
"version": "4.0.0",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test --no-check",
Expand Down
110 changes: 108 additions & 2 deletions src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { SyncAgent } from '../sync-agent/mod.ts';
import * as Y from 'yjs';
import { Awareness } from 'y-protocols/awareness.js';
import { Bundler } from './bundler.ts';
import { Decoder, Encoder } from '@ndn/tlv';
import { Component, Data, Name } from '@ndn/packet';
import { Version } from '@ndn/naming-convention2';
import { StateVector } from '@ndn/svs';

/**
* NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel.
Expand Down Expand Up @@ -30,14 +34,17 @@ export class NdnSvsAdaptor {
public syncAgent: SyncAgent,
public readonly doc: Y.Doc,
public readonly topic: string,
public readonly snapshotTopic: string = 'snapshot',
public readonly snapshotFrequency: number = 10,
useBundler: boolean = false,
) {
syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content));
syncAgent.register('blob', snapshotTopic, (content) => this.handleSnapshotUpdate(content));
doc.on('update', this.callback);
if (useBundler) {
this.#bundler = new Bundler(
Y.mergeUpdates,
(content) => this.syncAgent.publishUpdate(this.topic, content),
(content) => this.publishUpdate(this.topic, content),
{
thresholdSize: 3000,
delayMs: 400,
Expand Down Expand Up @@ -96,7 +103,106 @@ export class NdnSvsAdaptor {
if (this.#bundler) {
await this.#bundler.produce(content);
} else {
await this.syncAgent.publishUpdate(this.topic, content);
await this.publishUpdate(this.topic, content);
}
}

private async publishUpdate(topic: string, content: Uint8Array) {
await this.syncAgent.publishUpdate(topic, content);

const stateVector = this.syncAgent.getUpdateSyncSV();
let count = 0;
for (const [_id, seq] of stateVector) {
count += seq;
}

if (count % this.snapshotFrequency == 0) {
const encodedSV = Encoder.encode(stateVector);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// NOTE: Currently naming convention is hard-coded. May need organizing.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');
// New SVS encodings
const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV));

// Snapshot content generation
const content = Y.encodeStateAsUpdate(this.doc);
// its already in UInt8Array (binary), transporting currently without any additional encoding.
// use syncAgent's blob and publish mechanism
await this.syncAgent.publishBlob('snapshot', content, snapshotName, true);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// Race Condition note: Testing suggests that the write above with publishBlob()
// is near certainly done before the read happens below.
// Hence no delay is added.
// first segmented object is at /50=%00
const firstSegmentName = snapshotName.append('50=%00').toString();
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
}
}
}

async handleSnapshotUpdate(snapshotName: Uint8Array) {
// Maybe it's wise to put this under a try() because it might fail due to network issues.
const decodedSnapshotName = Decoder.decode(snapshotName, Name);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString());
let oldSVCount = 0;
if (oldSnapshotFirstSegmentEncoded) {
const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data);
const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value, StateVector);
for (const [_id, seq] of oldSnapshotVector) {
oldSVCount += seq;
}
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector);
let snapshotSVcount = 0;
for (const [_id, seq] of snapshotSV) {
snapshotSVcount += seq;
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// NOTE: From Github Discussion:
// Though, this "update the snapshot response strategy on receiving new snapshot from SVS" logic is somewhat optional in nature.
// It is ran, such that if a blind fetch request reaches an endpoint, endpoint returns a good response.
// Just like snapshot responses, we don't have to guarantee absolute latest when it is about blind fetching.
// hence we can just use a rough "total count" for determining if it needs an update.
if (snapshotSVcount > oldSVCount) {
const firstSegmentName = decodedSnapshotName.append('50=%00').toString();
// Race Condition Note: The callback to here is faster than
// fetchBlob() finish writing to persistStore.
// (in syncAgent before listener callback to here)
// Tested getBlob() to guarantee item arrival
// But ends up having multiple active sessions of fetchBlob(). bad.
// Hence a delay of 1 second.
await new Promise((r) => setTimeout(r, 1000));
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
// utilize snapshotPrefix above, with the same namingConvention warning.
// this is done to update the key of the prefix so program return latest when blind fetching.
this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
// should set snapshotPrefix to the newest packet.
} else {
console.debug('PersistentStorage doesnt have the snapshot yet. Skipping update.');
// If the above race condition fails (reads before data arrives),
// 'endpoint's blind fetch mechanism' is not updated to latest, should be fine.
}
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as endpoint from '@ndn/endpoint';
import type { Forwarder } from '@ndn/fw';
import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Component, Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Decoder, Encoder } from '@ndn/tlv';
import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object';
import { concatBuffers, fromHex } from '@ndn/util';
Expand Down Expand Up @@ -353,6 +353,19 @@ export class SyncAgent implements AsyncDisposable {

async serve(interest: Interest) {
const intName = interest.name;

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) {
const wire = await this.persistStorage.get(intName.toString());
if (wire === undefined || wire.length === 0) {
// console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`);
return undefined;
}
const data = Decoder.decode(wire, Data);
return data;
}

if (intName.length <= this.appPrefix.length + 1) {
// The name should be at least two components plus app prefix
return undefined;
Expand Down

0 comments on commit edcff4c

Please sign in to comment.