Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NDNts-aux snapshot #6

Merged
merged 15 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "3.0.3",
"version": "4.0.0",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test --no-check",
Expand Down
162 changes: 160 additions & 2 deletions src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import * as Y from 'yjs';
import { Awareness } from 'y-protocols/awareness.js';
import { Bundler } from './bundler.ts';

// Adam Chen Additional Imports
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
import { Decoder, Encoder } from '@ndn/tlv';
import { Component, Data, Name } from '@ndn/packet';
import { Version } from '@ndn/naming-convention2';
import { StateVector } from '@ndn/svs';

/**
* NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel.
*
Expand Down Expand Up @@ -33,11 +39,24 @@ export class NdnSvsAdaptor {
useBundler: boolean = false,
) {
syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content));
// Adam Chen callback on receiving a snapshot blob for Injection Point 3
syncAgent.register('blob', 'snapshot', (content) => this.handleSnapshotUpdate(content));
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
doc.on('update', this.callback);
if (useBundler) {
// this.#bundler = new Bundler(
// Y.mergeUpdates,
// (content) => this.syncAgent.publishUpdate(this.topic, content),
// {
// thresholdSize: 3000,
// delayMs: 400,
// maxDelayMs: 1600,
// },
// );

// Adam Chen Injection Point 1 override
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
this.#bundler = new Bundler(
Y.mergeUpdates,
(content) => this.syncAgent.publishUpdate(this.topic, content),
(content) => this.publishUpdate(this.topic, content),
{
thresholdSize: 3000,
delayMs: 400,
Expand Down Expand Up @@ -96,9 +115,148 @@ export class NdnSvsAdaptor {
if (this.#bundler) {
await this.#bundler.produce(content);
} else {
await this.syncAgent.publishUpdate(this.topic, content);
// await this.syncAgent.publishUpdate(this.topic, content);

// Adam Chen Injection point 1 override
await this.publishUpdate(this.topic, content);
}
}

// Adam Chen Injection point 1
private async publishUpdate(topic: string, content: Uint8Array) {
await this.syncAgent.publishUpdate(topic, content);
// await new Promise(r => setTimeout(r,500));
// forced wait so that publishUpdate() is completed before we check SV.
console.log('-- Injection point 1: Check StateVector / Create Snapshot --');
const stateVector = this.syncAgent.getUpdateSyncSV();
console.log('debug: stateVector object: ', stateVector);
let count = 0;
for (const [_id, seq] of stateVector) {
count += seq;
}
console.log('Total count of state vector', count);
console.log('The above number should match the state vector in the debug page');
// Snapshot Interval configuration: Currently hard-coded
// TODO: make the interval configurable
if (count % 5 == 0) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
CAWorks-ChrisA marked this conversation as resolved.
Show resolved Hide resolved
console.log("It's time to make a snapshot!");
console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString());
const encodedSV = Encoder.encode(stateVector);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// TODO: Currently naming convention is hard-coded. May need organizing.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');
// New SVS encodings
const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV));
console.log('debug: targeted snapshot Prefix (persistStore key): ', snapshotPrefix.toString());
// /groupPrefix/32=snapshot/
console.log('debug: targeted snapshot Name: ', snapshotName.toString());
// /groupPrefix/32=snapshot/54=<stateVector>
const decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector);
console.log('debug: decoding encoded SV from snapshotName: ', decodedSV);
let count = 0;
for (const [_id, seq] of decodedSV) {
count += seq;
}
console.log('debug: decoding encoded SV total packet count: ', count);
console.log('This should match the state vector in the debug page and the previous count before encoding');

// Snapshot content generation
const content = Y.encodeStateAsUpdate(this.doc);
// its already in UInt8Array (binary), transporting currently without any additional encoding.
console.log('yjs backend data: ', content);

// use syncAgent's blob and publish mechanism
await this.syncAgent.publishBlob('snapshot', content, snapshotName, true);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// Race Condition note: Testing suggests that the write above with publishBlob()
// is near certainly done before the read happens below.
// Hence no delay is added.
// first segmented object is at /50=%00
const firstSegmentName = snapshotName.append('50=%00').toString();
console.log('debugTargetName: ', firstSegmentName);
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
console.log('persistentStore check: ', firstSegmentPacket);
console.log('persistentStore check Data Name:', firstSegmentPacket.name.toString());
await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
// End Injection point 1

// -- Adam Chen Injection Point 3: HandleSnapshotUpdate --
async handleSnapshotUpdate(snapshotName: Uint8Array) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
// Maybe it's wise to put this under a try() because it might fail due to network issues.
const decodedSnapshotName = Decoder.decode(snapshotName, Name);
console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --');
console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString());

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');
// /groupPrefix/32=snapshot/
console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString());

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString());
let oldSVCount = 0;
if (oldSnapshotFirstSegmentEncoded) {
const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data);
const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value, StateVector);
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
for (const [_id, seq] of oldSnapshotVector) {
oldSVCount += seq;
}
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector);
let snapshotSVcount = 0;
for (const [_id, seq] of snapshotSV) {
snapshotSVcount += seq;
}

// debug
console.log('current state vector total count: ', oldSVCount);
console.log('snapshot state vector total count: ', snapshotSVcount);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
if (snapshotSVcount > oldSVCount) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
const firstSegmentName = decodedSnapshotName.append('50=%00').toString();
console.log('Retrieving the following from persist Storage: ', firstSegmentName);
// Race Condition Note: The callback to here is faster than
// fetchBlob() finish writing to persistStore.
// (in syncAgent before listener callback to here)
// Tested getBlob() to guarantee item arrival
// But ends up having multiple active sessions of fetchBlob(). bad.
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
// Hence a delay of 1 second.
// await this.syncAgent.getBlob(decodedSnapshotName)
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
await new Promise((r) => setTimeout(r, 1000));
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
console.log('Debug: Retrieval results: ', firstSegmentPacketEncoded);
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
console.log('Writing this packet: ', firstSegmentPacket.name.toString());
console.log('To this location: ', snapshotPrefix.toString());
// utilize snapshotPrefix above, with the same namingConvention warning.
// this is done to update the key of the prefix so program return latest when blind fetching.
this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
// should set snapshotPrefix to the newest packet.
} else {
console.log('PersistentStorage doesnt have the snapshot yet. Skipping update.');
// If the above race condition fails (reads before data arrives),
// 'endpoint's blind fetch mechanism' is not updated to latest, should be fine.
}
}
}
// End Injection point 3

public handleSyncUpdate(content: Uint8Array) {
// Apply patch
Expand Down
24 changes: 23 additions & 1 deletion src/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as endpoint from '@ndn/endpoint';
import type { Forwarder } from '@ndn/fw';
import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Component, Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Decoder, Encoder } from '@ndn/tlv';
import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object';
import { concatBuffers, fromHex } from '@ndn/util';
Expand Down Expand Up @@ -260,6 +260,8 @@ export class SyncAgent implements AsyncDisposable {
}

const buffers: Uint8Array[] = [];
//Adam Chen Debug
console.log('SyncAgent fetchBlob fetching: ', blobName.toString());
try {
const result = fetch(blobName, {
verifier: this.verifier,
Expand All @@ -282,6 +284,8 @@ export class SyncAgent implements AsyncDisposable {

// Save blob (SA getBlob())
await this.persistStorage.set(blobName.toString(), blob);
//Adam Chen Debug
console.log('SyncAgent fetchBlob complete.');
}

public register(channel: ChannelType, topic: string, handler: (content: Uint8Array, id: Name) => void) {
Expand Down Expand Up @@ -354,6 +358,24 @@ export class SyncAgent implements AsyncDisposable {

async serve(interest: Interest) {
const intName = interest.name;

// -- Adam Chen Injection point 2 --
// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) {
// console.log('snapshot interest detected, custom routine activated')
const wire = await this.persistStorage.get(intName.toString());
if (wire === undefined || wire.length === 0) {
// console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`);
console.log('MISS: SnapshotInterest: ', intName.toString());
return undefined;
}
const data = Decoder.decode(wire, Data);
console.log('HIT: SnapshotInterest and Returned Data: ', intName.toString(), data.name.toString());
return data;
}
// -- End Injection point 2 --

if (intName.length <= this.appPrefix.length + 1) {
// The name should be at least two components plus app prefix
return undefined;
Expand Down