From af959268b8245bbb46d792dff8a4f399422a9811 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Tue, 13 Aug 2024 21:47:29 -0700 Subject: [PATCH 01/11] Initial Snapshot code. Injection Parts 1,2,3 with lots of debug + a version bump. --- package.json | 2 +- src/adaptors/yjs-ndn-adaptor.ts | 134 +++++++++++++++++++++++++++++++- src/sync-agent/namespace.ts | 5 ++ src/sync-agent/sync-agent.ts | 22 ++++++ 4 files changed, 160 insertions(+), 3 deletions(-) diff --git a/package.json b/package.json index 14a7bf1..bf55e5f 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ucla-irl/ndnts-aux", - "version": "3.0.3", + "version": "4.0.0", "description": "NDNts Auxiliary Package for Web and Deno", "scripts": { "test": "deno test --no-check", diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index bb35d5d..3cbe099 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -3,6 +3,12 @@ import * as Y from 'yjs'; import { Awareness } from 'y-protocols/awareness.js'; import { Bundler } from './bundler.ts'; +// Adam Chen Additional Imports +import { Decoder, Encoder } from '@ndn/tlv'; +import { Component, Data, Name} from '@ndn/packet'; +import { Version } from '@ndn/naming-convention2'; +import { StateVector } from '@ndn/svs'; + /** * NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel. * @@ -33,11 +39,24 @@ export class NdnSvsAdaptor { useBundler: boolean = false, ) { syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content)); + // Adam Chen callback on receiving a snapshot blob for Injection Point 3 + syncAgent.register('blob','snapshot',(content) => this.handleSnapshotUpdate(content)) doc.on('update', this.callback); if (useBundler) { + // this.#bundler = new Bundler( + // Y.mergeUpdates, + // (content) => this.syncAgent.publishUpdate(this.topic, content), + // { + // thresholdSize: 3000, + // delayMs: 400, + // maxDelayMs: 1600, + // }, + // ); + + // Adam Chen Injection Point 1 override this.#bundler = new Bundler( Y.mergeUpdates, - (content) => this.syncAgent.publishUpdate(this.topic, content), + (content) => this.publishUpdate(this.topic, content), { thresholdSize: 3000, delayMs: 400, @@ -96,10 +115,121 @@ export class NdnSvsAdaptor { if (this.#bundler) { await this.#bundler.produce(content); } else { - await this.syncAgent.publishUpdate(this.topic, content); + // await this.syncAgent.publishUpdate(this.topic, content); + + // Adam Chen Injection point 1 override + await this.publishUpdate(this.topic,content) } } + // Adam Chen Injection point 1 + async publishUpdate(topic:any,content:any) { + await this.syncAgent.publishUpdate(topic, content) + // await new Promise(r => setTimeout(r,500)); + // forced wait so that publishUpdate() is completed before we check SV. + console.log('-- Injection point 1: Check StateVector / Create Snapshot --') + let stateVector = this.syncAgent.getUpdateSyncSV() + console.log('debug: stateVector object: ',stateVector) + let count = 0 + for (const [id,seq] of stateVector){ + count += seq + } + console.log('Total count of state vector', count) + console.log('The above number should match the state vector in the debug page') + if (count % 5 == 0){ + console.log('It\'s time to make a snapshot!') + console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString()) + let encodedSV = Encoder.encode(stateVector) + let snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") + // New SVS encodings + let snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)) + console.log('debug: targeted snapshot Prefix (persistStore key): ', snapshotPrefix.toString()) + // /groupPrefix/32=snapshot/ + console.log('debug: targeted snapshot Name: ', snapshotName.toString()) + // /groupPrefix/32=snapshot/54= + let decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector) + console.log('debug: decoding encoded SV from snapshotName: ', decodedSV) + let count = 0 + for (const [id,seq] of decodedSV){ + count += seq + } + console.log('debug: decoding encoded SV total packet count: ', count) + console.log('This should match the state vector in the debug page and the previous count before encoding') + + let content = Y.encodeStateAsUpdate(this.doc) + // its already in UTF8, transporting currently without any additional encoding. + console.log('yjs backend data: ',content) + + // use syncAgent's blob and publish mechanism - use a different topic. + + await this.syncAgent.publishBlob('snapshot',content,snapshotName,true) + + //first segmented object is at /50=%00 + let firstSegmentName = snapshotName.append('50=%00').toString() + console.log('debugTargetName: ', firstSegmentName) + let firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) + if (firstSegmentPacketEncoded){ + let firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded,Data) + console.log('persistentStore check: ', firstSegmentPacket) + console.log('persistentStore check Data Name:', firstSegmentPacket.name.toString()) + await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); + } + + } +} +// End Injection point 1 + +// -- Adam Chen Injection Point 3: HandleSnapshotUpdate -- +async handleSnapshotUpdate(snapshotName: Uint8Array){ + // Maybe it's wise to put this under a try() because it might fail due to network issues. + let decodedSnapshotName = Decoder.decode(snapshotName, Name) + console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --') + console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString()) + + let snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") + console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString()) + let oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()) + let oldSVCount = 0 + if (oldSnapshotFirstSegmentEncoded){ + let oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data) + let oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value,StateVector) + for (const [id,seq] of oldSnapshotVector){ + oldSVCount += seq + } + } + + let snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value,StateVector) + let snapshotSVcount = 0 + for (const [id,seq] of snapshotSV){ + snapshotSVcount += seq + } + + console.log('current state vector total count: ', oldSVCount) + console.log('snapshot state vector total count: ', snapshotSVcount) + + if (snapshotSVcount>oldSVCount){ + let firstSegmentName = decodedSnapshotName.append('50=%00').toString() + console.log('Retrieving the following from persist Storage: ', firstSegmentName) + // await this.syncAgent.getBlob(decodedSnapshotName) + await new Promise((r) => setTimeout(r, 1000)) + let firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) + if (firstSegmentPacketEncoded){ + console.log('Debug: Retrieval results: ', firstSegmentPacketEncoded) + let firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data) + console.log('Writing this packet: ', firstSegmentPacket.name.toString()) + console.log('To this location: ', snapshotPrefix.toString()) + // this is done to update the key of the prefix so program return latest when blind fetching. + this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); + // should set snapshotPrefix to the newest packet. + } + else { + console.log('PersistentStorage doesnt have the snapshot yet. Skipping update.') + } + + } +} +// End Injection point 3 + public handleSyncUpdate(content: Uint8Array) { // Apply patch // Remark: `applyUpdate` will trigger a transaction after the update is decoded. diff --git a/src/sync-agent/namespace.ts b/src/sync-agent/namespace.ts index e9f70c4..3956e89 100644 --- a/src/sync-agent/namespace.ts +++ b/src/sync-agent/namespace.ts @@ -53,6 +53,9 @@ export type SyncAgentNamespace = { /** Keyword component for latest only delivery. Default is `32=late` */ readonly latestOnlyKeyword: Component; + + /** Adam Chen: Keyword component for snapshots. Default is `32=snapshot` */ + readonly snapshotKeyword: Component; }; export function getNamespace(): SyncAgentNamespace { @@ -103,6 +106,8 @@ function createDefaultNamespace(): SyncAgentNamespace { syncKeyword: KeywordComponent.create('sync'), atLeastOnceKeyword: KeywordComponent.create('alo'), latestOnlyKeyword: KeywordComponent.create('late'), + // Adam Chen Add snapshotKeyword + snapshotKeyword: KeywordComponent.create('snapshot'), }; return ret; } diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index f265d7b..b8ed134 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -260,6 +260,8 @@ export class SyncAgent implements AsyncDisposable { } const buffers: Uint8Array[] = []; + //Adam Chen Debug + console.log('SyncAgent fetchBlob fetching: ', blobName.toString()) try { const result = fetch(blobName, { verifier: this.verifier, @@ -282,6 +284,8 @@ export class SyncAgent implements AsyncDisposable { // Save blob (SA getBlob()) await this.persistStorage.set(blobName.toString(), blob); + //Adam Chen Debug + console.log('SyncAgent fetchBlob complete.') } public register(channel: ChannelType, topic: string, handler: (content: Uint8Array, id: Name) => void) { @@ -354,6 +358,24 @@ export class SyncAgent implements AsyncDisposable { async serve(interest: Interest) { const intName = interest.name; + + // -- Adam Chen Injection point 2 -- + + if (intName.get(this.appPrefix.length)?.equals(getNamespace().snapshotKeyword)) { + // console.log('snapshot interest detected, custom routine activated') + let wire = await this.persistStorage.get(intName.toString()) + if (wire === undefined || wire.length === 0) { + // console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`); + console.log('MISS: SnapshotInterest: ', intName.toString()) + return undefined; + } + let data = Decoder.decode(wire, Data) + console.log('HIT: SnapshotInterest and Returned Data: ',intName.toString(),data.name.toString()) + return data + } + + // -- End Injection point 2 -- + if (intName.length <= this.appPrefix.length + 1) { // The name should be at least two components plus app prefix return undefined; From 8102741dd6370a1268a7213c855eb4aa15d26c2f Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Tue, 13 Aug 2024 23:08:22 -0700 Subject: [PATCH 02/11] Lint fixes 1 --- src/adaptors/yjs-ndn-adaptor.ts | 46 ++++++++++++++++----------------- src/sync-agent/sync-agent.ts | 4 +-- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 3cbe099..ddfa7eb 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -123,15 +123,15 @@ export class NdnSvsAdaptor { } // Adam Chen Injection point 1 - async publishUpdate(topic:any,content:any) { + private async publishUpdate(topic:string,content:Uint8Array) { await this.syncAgent.publishUpdate(topic, content) // await new Promise(r => setTimeout(r,500)); // forced wait so that publishUpdate() is completed before we check SV. console.log('-- Injection point 1: Check StateVector / Create Snapshot --') - let stateVector = this.syncAgent.getUpdateSyncSV() + const stateVector = this.syncAgent.getUpdateSyncSV() console.log('debug: stateVector object: ',stateVector) let count = 0 - for (const [id,seq] of stateVector){ + for (const [_id,seq] of stateVector){ count += seq } console.log('Total count of state vector', count) @@ -139,24 +139,24 @@ export class NdnSvsAdaptor { if (count % 5 == 0){ console.log('It\'s time to make a snapshot!') console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString()) - let encodedSV = Encoder.encode(stateVector) - let snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") + const encodedSV = Encoder.encode(stateVector) + const snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") // New SVS encodings - let snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)) + const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)) console.log('debug: targeted snapshot Prefix (persistStore key): ', snapshotPrefix.toString()) // /groupPrefix/32=snapshot/ console.log('debug: targeted snapshot Name: ', snapshotName.toString()) // /groupPrefix/32=snapshot/54= - let decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector) + const decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector) console.log('debug: decoding encoded SV from snapshotName: ', decodedSV) let count = 0 - for (const [id,seq] of decodedSV){ + for (const [_id,seq] of decodedSV){ count += seq } console.log('debug: decoding encoded SV total packet count: ', count) console.log('This should match the state vector in the debug page and the previous count before encoding') - let content = Y.encodeStateAsUpdate(this.doc) + const content = Y.encodeStateAsUpdate(this.doc) // its already in UTF8, transporting currently without any additional encoding. console.log('yjs backend data: ',content) @@ -165,11 +165,11 @@ export class NdnSvsAdaptor { await this.syncAgent.publishBlob('snapshot',content,snapshotName,true) //first segmented object is at /50=%00 - let firstSegmentName = snapshotName.append('50=%00').toString() + const firstSegmentName = snapshotName.append('50=%00').toString() console.log('debugTargetName: ', firstSegmentName) - let firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) + const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) if (firstSegmentPacketEncoded){ - let firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded,Data) + const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded,Data) console.log('persistentStore check: ', firstSegmentPacket) console.log('persistentStore check Data Name:', firstSegmentPacket.name.toString()) await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); @@ -182,25 +182,25 @@ export class NdnSvsAdaptor { // -- Adam Chen Injection Point 3: HandleSnapshotUpdate -- async handleSnapshotUpdate(snapshotName: Uint8Array){ // Maybe it's wise to put this under a try() because it might fail due to network issues. - let decodedSnapshotName = Decoder.decode(snapshotName, Name) + const decodedSnapshotName = Decoder.decode(snapshotName, Name) console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --') console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString()) - let snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") + const snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString()) - let oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()) + const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()) let oldSVCount = 0 if (oldSnapshotFirstSegmentEncoded){ - let oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data) - let oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value,StateVector) - for (const [id,seq] of oldSnapshotVector){ + const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data) + const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value,StateVector) + for (const [_id,seq] of oldSnapshotVector){ oldSVCount += seq } } - let snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value,StateVector) + const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value,StateVector) let snapshotSVcount = 0 - for (const [id,seq] of snapshotSV){ + for (const [_id,seq] of snapshotSV){ snapshotSVcount += seq } @@ -208,14 +208,14 @@ async handleSnapshotUpdate(snapshotName: Uint8Array){ console.log('snapshot state vector total count: ', snapshotSVcount) if (snapshotSVcount>oldSVCount){ - let firstSegmentName = decodedSnapshotName.append('50=%00').toString() + const firstSegmentName = decodedSnapshotName.append('50=%00').toString() console.log('Retrieving the following from persist Storage: ', firstSegmentName) // await this.syncAgent.getBlob(decodedSnapshotName) await new Promise((r) => setTimeout(r, 1000)) - let firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) + const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) if (firstSegmentPacketEncoded){ console.log('Debug: Retrieval results: ', firstSegmentPacketEncoded) - let firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data) + const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data) console.log('Writing this packet: ', firstSegmentPacket.name.toString()) console.log('To this location: ', snapshotPrefix.toString()) // this is done to update the key of the prefix so program return latest when blind fetching. diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index b8ed134..aa57d9a 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -363,13 +363,13 @@ export class SyncAgent implements AsyncDisposable { if (intName.get(this.appPrefix.length)?.equals(getNamespace().snapshotKeyword)) { // console.log('snapshot interest detected, custom routine activated') - let wire = await this.persistStorage.get(intName.toString()) + const wire = await this.persistStorage.get(intName.toString()) if (wire === undefined || wire.length === 0) { // console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`); console.log('MISS: SnapshotInterest: ', intName.toString()) return undefined; } - let data = Decoder.decode(wire, Data) + const data = Decoder.decode(wire, Data) console.log('HIT: SnapshotInterest and Returned Data: ',intName.toString(),data.name.toString()) return data } From 2f1e98fcbbcd66c06a13af2491df4e8654a2bbbf Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Tue, 13 Aug 2024 23:41:32 -0700 Subject: [PATCH 03/11] lint fixes 2 --- src/adaptors/yjs-ndn-adaptor.ts | 183 ++++++++++++++++---------------- src/sync-agent/sync-agent.ts | 24 ++--- 2 files changed, 101 insertions(+), 106 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index ddfa7eb..e0316ea 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -5,7 +5,7 @@ import { Bundler } from './bundler.ts'; // Adam Chen Additional Imports import { Decoder, Encoder } from '@ndn/tlv'; -import { Component, Data, Name} from '@ndn/packet'; +import { Component, Data, Name } from '@ndn/packet'; import { Version } from '@ndn/naming-convention2'; import { StateVector } from '@ndn/svs'; @@ -40,7 +40,7 @@ export class NdnSvsAdaptor { ) { syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content)); // Adam Chen callback on receiving a snapshot blob for Injection Point 3 - syncAgent.register('blob','snapshot',(content) => this.handleSnapshotUpdate(content)) + syncAgent.register('blob', 'snapshot', (content) => this.handleSnapshotUpdate(content)); doc.on('update', this.callback); if (useBundler) { // this.#bundler = new Bundler( @@ -117,118 +117,115 @@ export class NdnSvsAdaptor { } else { // await this.syncAgent.publishUpdate(this.topic, content); - // Adam Chen Injection point 1 override - await this.publishUpdate(this.topic,content) + // Adam Chen Injection point 1 override + await this.publishUpdate(this.topic, content); } } // Adam Chen Injection point 1 - private async publishUpdate(topic:string,content:Uint8Array) { - await this.syncAgent.publishUpdate(topic, content) + private async publishUpdate(topic: string, content: Uint8Array) { + await this.syncAgent.publishUpdate(topic, content); // await new Promise(r => setTimeout(r,500)); // forced wait so that publishUpdate() is completed before we check SV. - console.log('-- Injection point 1: Check StateVector / Create Snapshot --') - const stateVector = this.syncAgent.getUpdateSyncSV() - console.log('debug: stateVector object: ',stateVector) - let count = 0 - for (const [_id,seq] of stateVector){ - count += seq + console.log('-- Injection point 1: Check StateVector / Create Snapshot --'); + const stateVector = this.syncAgent.getUpdateSyncSV(); + console.log('debug: stateVector object: ', stateVector); + let count = 0; + for (const [_id, seq] of stateVector) { + count += seq; } - console.log('Total count of state vector', count) - console.log('The above number should match the state vector in the debug page') - if (count % 5 == 0){ - console.log('It\'s time to make a snapshot!') - console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString()) - const encodedSV = Encoder.encode(stateVector) - const snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") - // New SVS encodings - const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)) - console.log('debug: targeted snapshot Prefix (persistStore key): ', snapshotPrefix.toString()) - // /groupPrefix/32=snapshot/ - console.log('debug: targeted snapshot Name: ', snapshotName.toString()) - // /groupPrefix/32=snapshot/54= - const decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector) - console.log('debug: decoding encoded SV from snapshotName: ', decodedSV) - let count = 0 - for (const [_id,seq] of decodedSV){ - count += seq - } - console.log('debug: decoding encoded SV total packet count: ', count) - console.log('This should match the state vector in the debug page and the previous count before encoding') + console.log('Total count of state vector', count); + console.log('The above number should match the state vector in the debug page'); + if (count % 5 == 0) { + console.log("It's time to make a snapshot!"); + console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString()); + const encodedSV = Encoder.encode(stateVector); + const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); + // New SVS encodings + const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)); + console.log('debug: targeted snapshot Prefix (persistStore key): ', snapshotPrefix.toString()); + // /groupPrefix/32=snapshot/ + console.log('debug: targeted snapshot Name: ', snapshotName.toString()); + // /groupPrefix/32=snapshot/54= + const decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector); + console.log('debug: decoding encoded SV from snapshotName: ', decodedSV); + let count = 0; + for (const [_id, seq] of decodedSV) { + count += seq; + } + console.log('debug: decoding encoded SV total packet count: ', count); + console.log('This should match the state vector in the debug page and the previous count before encoding'); - const content = Y.encodeStateAsUpdate(this.doc) - // its already in UTF8, transporting currently without any additional encoding. - console.log('yjs backend data: ',content) + const content = Y.encodeStateAsUpdate(this.doc); + // its already in UTF8, transporting currently without any additional encoding. + console.log('yjs backend data: ', content); - // use syncAgent's blob and publish mechanism - use a different topic. + // use syncAgent's blob and publish mechanism - use a different topic. - await this.syncAgent.publishBlob('snapshot',content,snapshotName,true) + await this.syncAgent.publishBlob('snapshot', content, snapshotName, true); - //first segmented object is at /50=%00 - const firstSegmentName = snapshotName.append('50=%00').toString() - console.log('debugTargetName: ', firstSegmentName) - const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) - if (firstSegmentPacketEncoded){ - const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded,Data) - console.log('persistentStore check: ', firstSegmentPacket) - console.log('persistentStore check Data Name:', firstSegmentPacket.name.toString()) - await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); - } - + //first segmented object is at /50=%00 + const firstSegmentName = snapshotName.append('50=%00').toString(); + console.log('debugTargetName: ', firstSegmentName); + const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); + if (firstSegmentPacketEncoded) { + const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); + console.log('persistentStore check: ', firstSegmentPacket); + console.log('persistentStore check Data Name:', firstSegmentPacket.name.toString()); + await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); + } } -} -// End Injection point 1 + } + // End Injection point 1 -// -- Adam Chen Injection Point 3: HandleSnapshotUpdate -- -async handleSnapshotUpdate(snapshotName: Uint8Array){ + // -- Adam Chen Injection Point 3: HandleSnapshotUpdate -- + async handleSnapshotUpdate(snapshotName: Uint8Array) { // Maybe it's wise to put this under a try() because it might fail due to network issues. - const decodedSnapshotName = Decoder.decode(snapshotName, Name) - console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --') - console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString()) + const decodedSnapshotName = Decoder.decode(snapshotName, Name); + console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --'); + console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString()); - const snapshotPrefix = this.syncAgent.appPrefix.append("32=snapshot") - console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString()) - const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()) - let oldSVCount = 0 - if (oldSnapshotFirstSegmentEncoded){ - const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data) - const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value,StateVector) - for (const [_id,seq] of oldSnapshotVector){ - oldSVCount += seq - } + const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); + console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString()); + const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()); + let oldSVCount = 0; + if (oldSnapshotFirstSegmentEncoded) { + const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data); + const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value, StateVector); + for (const [_id, seq] of oldSnapshotVector) { + oldSVCount += seq; + } } - const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value,StateVector) - let snapshotSVcount = 0 - for (const [_id,seq] of snapshotSV){ - snapshotSVcount += seq + const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector); + let snapshotSVcount = 0; + for (const [_id, seq] of snapshotSV) { + snapshotSVcount += seq; } - console.log('current state vector total count: ', oldSVCount) - console.log('snapshot state vector total count: ', snapshotSVcount) + console.log('current state vector total count: ', oldSVCount); + console.log('snapshot state vector total count: ', snapshotSVcount); - if (snapshotSVcount>oldSVCount){ - const firstSegmentName = decodedSnapshotName.append('50=%00').toString() - console.log('Retrieving the following from persist Storage: ', firstSegmentName) - // await this.syncAgent.getBlob(decodedSnapshotName) - await new Promise((r) => setTimeout(r, 1000)) - const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName) - if (firstSegmentPacketEncoded){ - console.log('Debug: Retrieval results: ', firstSegmentPacketEncoded) - const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data) - console.log('Writing this packet: ', firstSegmentPacket.name.toString()) - console.log('To this location: ', snapshotPrefix.toString()) - // this is done to update the key of the prefix so program return latest when blind fetching. - this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); - // should set snapshotPrefix to the newest packet. - } - else { - console.log('PersistentStorage doesnt have the snapshot yet. Skipping update.') - } - + if (snapshotSVcount > oldSVCount) { + const firstSegmentName = decodedSnapshotName.append('50=%00').toString(); + console.log('Retrieving the following from persist Storage: ', firstSegmentName); + // await this.syncAgent.getBlob(decodedSnapshotName) + await new Promise((r) => setTimeout(r, 1000)); + const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); + if (firstSegmentPacketEncoded) { + console.log('Debug: Retrieval results: ', firstSegmentPacketEncoded); + const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); + console.log('Writing this packet: ', firstSegmentPacket.name.toString()); + console.log('To this location: ', snapshotPrefix.toString()); + // this is done to update the key of the prefix so program return latest when blind fetching. + this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); + // should set snapshotPrefix to the newest packet. + } else { + console.log('PersistentStorage doesnt have the snapshot yet. Skipping update.'); + } } -} -// End Injection point 3 + } + // End Injection point 3 public handleSyncUpdate(content: Uint8Array) { // Apply patch diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index aa57d9a..d1e3fa3 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -261,7 +261,7 @@ export class SyncAgent implements AsyncDisposable { const buffers: Uint8Array[] = []; //Adam Chen Debug - console.log('SyncAgent fetchBlob fetching: ', blobName.toString()) + console.log('SyncAgent fetchBlob fetching: ', blobName.toString()); try { const result = fetch(blobName, { verifier: this.verifier, @@ -285,7 +285,7 @@ export class SyncAgent implements AsyncDisposable { // Save blob (SA getBlob()) await this.persistStorage.set(blobName.toString(), blob); //Adam Chen Debug - console.log('SyncAgent fetchBlob complete.') + console.log('SyncAgent fetchBlob complete.'); } public register(channel: ChannelType, topic: string, handler: (content: Uint8Array, id: Name) => void) { @@ -360,21 +360,19 @@ export class SyncAgent implements AsyncDisposable { const intName = interest.name; // -- Adam Chen Injection point 2 -- - if (intName.get(this.appPrefix.length)?.equals(getNamespace().snapshotKeyword)) { // console.log('snapshot interest detected, custom routine activated') - const wire = await this.persistStorage.get(intName.toString()) + const wire = await this.persistStorage.get(intName.toString()); if (wire === undefined || wire.length === 0) { - // console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`); - console.log('MISS: SnapshotInterest: ', intName.toString()) - return undefined; - } - const data = Decoder.decode(wire, Data) - console.log('HIT: SnapshotInterest and Returned Data: ',intName.toString(),data.name.toString()) - return data + // console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`); + console.log('MISS: SnapshotInterest: ', intName.toString()); + return undefined; } - - // -- End Injection point 2 -- + const data = Decoder.decode(wire, Data); + console.log('HIT: SnapshotInterest and Returned Data: ', intName.toString(), data.name.toString()); + return data; + } + // -- End Injection point 2 -- if (intName.length <= this.appPrefix.length + 1) { // The name should be at least two components plus app prefix From bf95bab1542e804fb3b43b9cccbeb33695c136f6 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Wed, 14 Aug 2024 12:04:48 -0700 Subject: [PATCH 04/11] Annotations 1 --- src/adaptors/yjs-ndn-adaptor.ts | 39 +++++++++++++++++++++++++++++---- src/sync-agent/sync-agent.ts | 2 ++ 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index e0316ea..0298ffe 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -136,10 +136,16 @@ export class NdnSvsAdaptor { } console.log('Total count of state vector', count); console.log('The above number should match the state vector in the debug page'); + // Snapshot Interval configuration: Currently hard-coded + // TODO: make the interval configurable if (count % 5 == 0) { console.log("It's time to make a snapshot!"); console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString()); const encodedSV = Encoder.encode(stateVector); + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + // TODO: Currently naming convention is hard-coded. May need organizing. const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); // New SVS encodings const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)); @@ -156,15 +162,20 @@ export class NdnSvsAdaptor { console.log('debug: decoding encoded SV total packet count: ', count); console.log('This should match the state vector in the debug page and the previous count before encoding'); + // Snapshot content generation const content = Y.encodeStateAsUpdate(this.doc); - // its already in UTF8, transporting currently without any additional encoding. + // its already in UInt8Array (binary), transporting currently without any additional encoding. console.log('yjs backend data: ', content); - // use syncAgent's blob and publish mechanism - use a different topic. - + // use syncAgent's blob and publish mechanism await this.syncAgent.publishBlob('snapshot', content, snapshotName, true); - //first segmented object is at /50=%00 + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + // Race Condition note: Testing suggests that the write above with publishBlob() + // is near certainly done before the read happens below. + // Hence no delay is added. + // first segmented object is at /50=%00 const firstSegmentName = snapshotName.append('50=%00').toString(); console.log('debugTargetName: ', firstSegmentName); const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); @@ -185,8 +196,14 @@ export class NdnSvsAdaptor { console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --'); console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString()); + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); + // /groupPrefix/32=snapshot/ console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString()); + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()); let oldSVCount = 0; if (oldSnapshotFirstSegmentEncoded) { @@ -197,18 +214,29 @@ export class NdnSvsAdaptor { } } + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector); let snapshotSVcount = 0; for (const [_id, seq] of snapshotSV) { snapshotSVcount += seq; } + // debug console.log('current state vector total count: ', oldSVCount); console.log('snapshot state vector total count: ', snapshotSVcount); + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. if (snapshotSVcount > oldSVCount) { const firstSegmentName = decodedSnapshotName.append('50=%00').toString(); console.log('Retrieving the following from persist Storage: ', firstSegmentName); + // Race Condition Note: The callback to here is faster than + // fetchBlob() finish writing to persistStore. + // (in syncAgent before listener callback to here) + // Tested getBlob() to guarantee item arrival + // But ends up having multiple active sessions of fetchBlob(). bad. + // Hence a delay of 1 second. // await this.syncAgent.getBlob(decodedSnapshotName) await new Promise((r) => setTimeout(r, 1000)); const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); @@ -217,11 +245,14 @@ export class NdnSvsAdaptor { const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); console.log('Writing this packet: ', firstSegmentPacket.name.toString()); console.log('To this location: ', snapshotPrefix.toString()); + // utilize snapshotPrefix above, with the same namingConvention warning. // this is done to update the key of the prefix so program return latest when blind fetching. this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); // should set snapshotPrefix to the newest packet. } else { console.log('PersistentStorage doesnt have the snapshot yet. Skipping update.'); + // If the above race condition fails (reads before data arrives), + // 'endpoint's blind fetch mechanism' is not updated to latest, should be fine. } } } diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index d1e3fa3..1822787 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -360,6 +360,8 @@ export class SyncAgent implements AsyncDisposable { const intName = interest.name; // -- Adam Chen Injection point 2 -- + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. if (intName.get(this.appPrefix.length)?.equals(getNamespace().snapshotKeyword)) { // console.log('snapshot interest detected, custom routine activated') const wire = await this.persistStorage.get(intName.toString()); From 9d48e9696dc27d926a90a6b9dcb59ba28604ad95 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Wed, 14 Aug 2024 12:24:47 -0700 Subject: [PATCH 05/11] Move snapshot keyword out of namespace.ts --- src/sync-agent/namespace.ts | 5 ----- src/sync-agent/sync-agent.ts | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/sync-agent/namespace.ts b/src/sync-agent/namespace.ts index 3956e89..e9f70c4 100644 --- a/src/sync-agent/namespace.ts +++ b/src/sync-agent/namespace.ts @@ -53,9 +53,6 @@ export type SyncAgentNamespace = { /** Keyword component for latest only delivery. Default is `32=late` */ readonly latestOnlyKeyword: Component; - - /** Adam Chen: Keyword component for snapshots. Default is `32=snapshot` */ - readonly snapshotKeyword: Component; }; export function getNamespace(): SyncAgentNamespace { @@ -106,8 +103,6 @@ function createDefaultNamespace(): SyncAgentNamespace { syncKeyword: KeywordComponent.create('sync'), atLeastOnceKeyword: KeywordComponent.create('alo'), latestOnlyKeyword: KeywordComponent.create('late'), - // Adam Chen Add snapshotKeyword - snapshotKeyword: KeywordComponent.create('snapshot'), }; return ret; } diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index 1822787..362b7a1 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -1,6 +1,6 @@ import * as endpoint from '@ndn/endpoint'; import type { Forwarder } from '@ndn/fw'; -import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet'; +import { Component, Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet'; import { Decoder, Encoder } from '@ndn/tlv'; import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object'; import { concatBuffers, fromHex } from '@ndn/util'; @@ -362,7 +362,7 @@ export class SyncAgent implements AsyncDisposable { // -- Adam Chen Injection point 2 -- // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. - if (intName.get(this.appPrefix.length)?.equals(getNamespace().snapshotKeyword)) { + if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) { // console.log('snapshot interest detected, custom routine activated') const wire = await this.persistStorage.get(intName.toString()); if (wire === undefined || wire.length === 0) { From 05a716e82fa39fb81c1f247999569468fe743cd7 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Sat, 28 Sep 2024 15:40:53 -0700 Subject: [PATCH 06/11] cleanup --- src/adaptors/yjs-ndn-adaptor.ts | 53 ++------------------------------- src/sync-agent/sync-agent.ts | 7 ----- 2 files changed, 2 insertions(+), 58 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 0298ffe..3848d67 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -43,16 +43,6 @@ export class NdnSvsAdaptor { syncAgent.register('blob', 'snapshot', (content) => this.handleSnapshotUpdate(content)); doc.on('update', this.callback); if (useBundler) { - // this.#bundler = new Bundler( - // Y.mergeUpdates, - // (content) => this.syncAgent.publishUpdate(this.topic, content), - // { - // thresholdSize: 3000, - // delayMs: 400, - // maxDelayMs: 1600, - // }, - // ); - // Adam Chen Injection Point 1 override this.#bundler = new Bundler( Y.mergeUpdates, @@ -115,8 +105,6 @@ export class NdnSvsAdaptor { if (this.#bundler) { await this.#bundler.produce(content); } else { - // await this.syncAgent.publishUpdate(this.topic, content); - // Adam Chen Injection point 1 override await this.publishUpdate(this.topic, content); } @@ -125,22 +113,15 @@ export class NdnSvsAdaptor { // Adam Chen Injection point 1 private async publishUpdate(topic: string, content: Uint8Array) { await this.syncAgent.publishUpdate(topic, content); - // await new Promise(r => setTimeout(r,500)); - // forced wait so that publishUpdate() is completed before we check SV. - console.log('-- Injection point 1: Check StateVector / Create Snapshot --'); + const stateVector = this.syncAgent.getUpdateSyncSV(); - console.log('debug: stateVector object: ', stateVector); let count = 0; for (const [_id, seq] of stateVector) { count += seq; } - console.log('Total count of state vector', count); - console.log('The above number should match the state vector in the debug page'); // Snapshot Interval configuration: Currently hard-coded // TODO: make the interval configurable if (count % 5 == 0) { - console.log("It's time to make a snapshot!"); - console.log('debug: group prefix: ', this.syncAgent.appPrefix.toString()); const encodedSV = Encoder.encode(stateVector); // NOTE: The following code depend on snapshot naming convention to work. @@ -149,24 +130,10 @@ export class NdnSvsAdaptor { const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); // New SVS encodings const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)); - console.log('debug: targeted snapshot Prefix (persistStore key): ', snapshotPrefix.toString()); - // /groupPrefix/32=snapshot/ - console.log('debug: targeted snapshot Name: ', snapshotName.toString()); - // /groupPrefix/32=snapshot/54= - const decodedSV = Decoder.decode(snapshotName.at(-1).value, StateVector); - console.log('debug: decoding encoded SV from snapshotName: ', decodedSV); - let count = 0; - for (const [_id, seq] of decodedSV) { - count += seq; - } - console.log('debug: decoding encoded SV total packet count: ', count); - console.log('This should match the state vector in the debug page and the previous count before encoding'); // Snapshot content generation const content = Y.encodeStateAsUpdate(this.doc); // its already in UInt8Array (binary), transporting currently without any additional encoding. - console.log('yjs backend data: ', content); - // use syncAgent's blob and publish mechanism await this.syncAgent.publishBlob('snapshot', content, snapshotName, true); @@ -177,12 +144,9 @@ export class NdnSvsAdaptor { // Hence no delay is added. // first segmented object is at /50=%00 const firstSegmentName = snapshotName.append('50=%00').toString(); - console.log('debugTargetName: ', firstSegmentName); const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); if (firstSegmentPacketEncoded) { const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); - console.log('persistentStore check: ', firstSegmentPacket); - console.log('persistentStore check Data Name:', firstSegmentPacket.name.toString()); await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); } } @@ -193,14 +157,10 @@ export class NdnSvsAdaptor { async handleSnapshotUpdate(snapshotName: Uint8Array) { // Maybe it's wise to put this under a try() because it might fail due to network issues. const decodedSnapshotName = Decoder.decode(snapshotName, Name); - console.log('-- Adam Chen Injection Point 3: Update Latest Snapshot (Received) --'); - console.log('Handling received snapshot packet with name: ', decodedSnapshotName.toString()); // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); - // /groupPrefix/32=snapshot/ - console.log('snapshot prefix in persistStorage: ', snapshotPrefix.toString()); // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. @@ -222,35 +182,26 @@ export class NdnSvsAdaptor { snapshotSVcount += seq; } - // debug - console.log('current state vector total count: ', oldSVCount); - console.log('snapshot state vector total count: ', snapshotSVcount); - // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. if (snapshotSVcount > oldSVCount) { const firstSegmentName = decodedSnapshotName.append('50=%00').toString(); - console.log('Retrieving the following from persist Storage: ', firstSegmentName); // Race Condition Note: The callback to here is faster than // fetchBlob() finish writing to persistStore. // (in syncAgent before listener callback to here) // Tested getBlob() to guarantee item arrival // But ends up having multiple active sessions of fetchBlob(). bad. // Hence a delay of 1 second. - // await this.syncAgent.getBlob(decodedSnapshotName) await new Promise((r) => setTimeout(r, 1000)); const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); if (firstSegmentPacketEncoded) { - console.log('Debug: Retrieval results: ', firstSegmentPacketEncoded); const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); - console.log('Writing this packet: ', firstSegmentPacket.name.toString()); - console.log('To this location: ', snapshotPrefix.toString()); // utilize snapshotPrefix above, with the same namingConvention warning. // this is done to update the key of the prefix so program return latest when blind fetching. this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); // should set snapshotPrefix to the newest packet. } else { - console.log('PersistentStorage doesnt have the snapshot yet. Skipping update.'); + console.debug('PersistentStorage doesnt have the snapshot yet. Skipping update.'); // If the above race condition fails (reads before data arrives), // 'endpoint's blind fetch mechanism' is not updated to latest, should be fine. } diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index e2b550a..f0e953a 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -260,8 +260,6 @@ export class SyncAgent implements AsyncDisposable { } const buffers: Uint8Array[] = []; - //Adam Chen Debug - console.log('SyncAgent fetchBlob fetching: ', blobName.toString()); try { const result = fetch(blobName, { verifier: this.verifier, @@ -283,8 +281,6 @@ export class SyncAgent implements AsyncDisposable { // Save blob (SA getBlob()) await this.persistStorage.set(blobName.toString(), blob); - //Adam Chen Debug - console.log('SyncAgent fetchBlob complete.'); } public register(channel: ChannelType, topic: string, handler: (content: Uint8Array, id: Name) => void) { @@ -362,15 +358,12 @@ export class SyncAgent implements AsyncDisposable { // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) { - // console.log('snapshot interest detected, custom routine activated') const wire = await this.persistStorage.get(intName.toString()); if (wire === undefined || wire.length === 0) { // console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`); - console.log('MISS: SnapshotInterest: ', intName.toString()); return undefined; } const data = Decoder.decode(wire, Data); - console.log('HIT: SnapshotInterest and Returned Data: ', intName.toString(), data.name.toString()); return data; } // -- End Injection point 2 -- From 715aa878ff3d0fa1fe0100f377d84190a64a174d Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Mon, 30 Sep 2024 14:28:00 -0700 Subject: [PATCH 07/11] comment fix --- src/adaptors/yjs-ndn-adaptor.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 3848d67..27c77d2 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -2,8 +2,6 @@ import { SyncAgent } from '../sync-agent/mod.ts'; import * as Y from 'yjs'; import { Awareness } from 'y-protocols/awareness.js'; import { Bundler } from './bundler.ts'; - -// Adam Chen Additional Imports import { Decoder, Encoder } from '@ndn/tlv'; import { Component, Data, Name } from '@ndn/packet'; import { Version } from '@ndn/naming-convention2'; @@ -43,7 +41,6 @@ export class NdnSvsAdaptor { syncAgent.register('blob', 'snapshot', (content) => this.handleSnapshotUpdate(content)); doc.on('update', this.callback); if (useBundler) { - // Adam Chen Injection Point 1 override this.#bundler = new Bundler( Y.mergeUpdates, (content) => this.publishUpdate(this.topic, content), From 0041767bdd0306676852ed5a422715ffc6edf797 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Mon, 30 Sep 2024 15:32:32 -0700 Subject: [PATCH 08/11] untested: constructor arguments --- src/adaptors/yjs-ndn-adaptor.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 27c77d2..6937ede 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -34,11 +34,13 @@ export class NdnSvsAdaptor { public syncAgent: SyncAgent, public readonly doc: Y.Doc, public readonly topic: string, + public readonly snapshotTopic: string = 'snapshot', + public readonly snapshotFrequency: number = 10, useBundler: boolean = false, ) { syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content)); // Adam Chen callback on receiving a snapshot blob for Injection Point 3 - syncAgent.register('blob', 'snapshot', (content) => this.handleSnapshotUpdate(content)); + syncAgent.register('blob', snapshotTopic, (content) => this.handleSnapshotUpdate(content)); doc.on('update', this.callback); if (useBundler) { this.#bundler = new Bundler( @@ -118,7 +120,7 @@ export class NdnSvsAdaptor { } // Snapshot Interval configuration: Currently hard-coded // TODO: make the interval configurable - if (count % 5 == 0) { + if (count % this.snapshotFrequency == 0) { const encodedSV = Encoder.encode(stateVector); // NOTE: The following code depend on snapshot naming convention to work. From f7904889365f390b53323ad5bcf7c22a77f131f7 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Wed, 2 Oct 2024 13:26:02 -0700 Subject: [PATCH 09/11] clean up part 2 --- src/adaptors/yjs-ndn-adaptor.ts | 11 ++--------- src/sync-agent/sync-agent.ts | 2 -- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 6937ede..fdbe1ef 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -39,7 +39,6 @@ export class NdnSvsAdaptor { useBundler: boolean = false, ) { syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content)); - // Adam Chen callback on receiving a snapshot blob for Injection Point 3 syncAgent.register('blob', snapshotTopic, (content) => this.handleSnapshotUpdate(content)); doc.on('update', this.callback); if (useBundler) { @@ -104,12 +103,10 @@ export class NdnSvsAdaptor { if (this.#bundler) { await this.#bundler.produce(content); } else { - // Adam Chen Injection point 1 override await this.publishUpdate(this.topic, content); } } - // Adam Chen Injection point 1 private async publishUpdate(topic: string, content: Uint8Array) { await this.syncAgent.publishUpdate(topic, content); @@ -118,14 +115,13 @@ export class NdnSvsAdaptor { for (const [_id, seq] of stateVector) { count += seq; } - // Snapshot Interval configuration: Currently hard-coded - // TODO: make the interval configurable + if (count % this.snapshotFrequency == 0) { const encodedSV = Encoder.encode(stateVector); // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. - // TODO: Currently naming convention is hard-coded. May need organizing. + // NOTE: Currently naming convention is hard-coded. May need organizing. const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); // New SVS encodings const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)); @@ -150,9 +146,7 @@ export class NdnSvsAdaptor { } } } - // End Injection point 1 - // -- Adam Chen Injection Point 3: HandleSnapshotUpdate -- async handleSnapshotUpdate(snapshotName: Uint8Array) { // Maybe it's wise to put this under a try() because it might fail due to network issues. const decodedSnapshotName = Decoder.decode(snapshotName, Name); @@ -206,7 +200,6 @@ export class NdnSvsAdaptor { } } } - // End Injection point 3 public handleSyncUpdate(content: Uint8Array) { // Apply patch diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index f0e953a..e94f446 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -354,7 +354,6 @@ export class SyncAgent implements AsyncDisposable { async serve(interest: Interest) { const intName = interest.name; - // -- Adam Chen Injection point 2 -- // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) { @@ -366,7 +365,6 @@ export class SyncAgent implements AsyncDisposable { const data = Decoder.decode(wire, Data); return data; } - // -- End Injection point 2 -- if (intName.length <= this.appPrefix.length + 1) { // The name should be at least two components plus app prefix From c24e5eb31c17dfac261fcb7192083f5056d6957c Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Wed, 2 Oct 2024 13:37:06 -0700 Subject: [PATCH 10/11] add comment --- src/adaptors/yjs-ndn-adaptor.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index fdbe1ef..2d4989b 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -177,6 +177,11 @@ export class NdnSvsAdaptor { // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. + // NOTE: From Github Discussion: + // Though, this "update the snapshot response strategy on receiving new snapshot from SVS" logic is somewhat optional in nature. + // It is ran, such that if a blind fetch request reaches an endpoint, endpoint returns a good response. + // Just like snapshot responses, we don't have to guarantee absolute latest when it is about blind fetching. + // hence we can just use a rough "total count" for determining if it needs an update. if (snapshotSVcount > oldSVCount) { const firstSegmentName = decodedSnapshotName.append('50=%00').toString(); // Race Condition Note: The callback to here is faster than From 2427535a584f14e2b775208ef31bee163bfe0b55 Mon Sep 17 00:00:00 2001 From: Adam Chen Date: Wed, 2 Oct 2024 21:29:37 -0700 Subject: [PATCH 11/11] hotfix lint --- src/adaptors/yjs-ndn-adaptor.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index 2d4989b..813529c 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -178,8 +178,8 @@ export class NdnSvsAdaptor { // NOTE: The following code depend on snapshot naming convention to work. // Verify this part if there's a change in naming convention. // NOTE: From Github Discussion: - // Though, this "update the snapshot response strategy on receiving new snapshot from SVS" logic is somewhat optional in nature. - // It is ran, such that if a blind fetch request reaches an endpoint, endpoint returns a good response. + // Though, this "update the snapshot response strategy on receiving new snapshot from SVS" logic is somewhat optional in nature. + // It is ran, such that if a blind fetch request reaches an endpoint, endpoint returns a good response. // Just like snapshot responses, we don't have to guarantee absolute latest when it is about blind fetching. // hence we can just use a rough "total count" for determining if it needs an update. if (snapshotSVcount > oldSVCount) {