From 330c34cf600e57c035e3a0f26fdc6db8cd651aef Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Thu, 14 Mar 2024 06:15:09 -0700 Subject: [PATCH] Remove the use of endpoint --- deno.jsonc | 30 ++++++++++---- package.json | 2 +- src/namespace/base-node.ts | 10 ++--- src/namespace/expressing-point.ts | 6 ++- src/namespace/nt-schema.test.ts | 26 +++++------- src/namespace/nt-schema.ts | 26 ++++++++---- .../segmented-object/fetcher.test.ts | 7 +--- .../segmented-object/segmented-object.test.ts | 10 ++--- src/namespace/sync/sync.ts | 2 +- src/security/cert-storage.test.ts | 41 +++++++------------ src/security/cert-storage.ts | 12 +++--- src/storage/deno-kv.ts | 1 + src/sync-agent/deliveries.ts | 29 ++++++------- src/sync-agent/delivery-alo.test.ts | 14 +++---- src/sync-agent/sync-agent.ts | 15 ++++--- src/utils/responder.ts | 8 ++-- src/workspace/workspace.ts | 10 ++--- 17 files changed, 129 insertions(+), 120 deletions(-) diff --git a/deno.jsonc b/deno.jsonc index 24e7fce..bb0a496 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -1,7 +1,10 @@ { - "tasks": { - }, - "unstable": ["byonm", "net", "fs"], + "tasks": {}, + "unstable": [ + "byonm", + "net", + "fs" + ], "compilerOptions": { "types": [ "@types/wicg-file-system-access" @@ -13,12 +16,25 @@ "indentWidth": 2, "useTabs": false, "semiColons": true, - "include": ["src/**", "build.ts"] + "include": [ + "src/**", + "build.ts" + ] }, "lint": { - "include": ["src/**", "build.ts"] + "include": [ + "src/**", + "build.ts" + ], + "rules": { + "include": [ + "deprecated" + ] + } }, "test": { - "exclude": ["dist/"] + "exclude": [ + "dist/" + ] } -} +} \ No newline at end of file diff --git a/package.json b/package.json index d129ecf..f3c8fd5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ucla-irl/ndnts-aux", - "version": "1.1.5", + "version": "2.0.0", "description": "NDNts Auxiliary Package for Web and Deno", "scripts": { "test": "deno test --no-check", diff --git a/src/namespace/base-node.ts b/src/namespace/base-node.ts index 637909c..91487f2 100644 --- a/src/namespace/base-node.ts +++ b/src/namespace/base-node.ts @@ -1,4 +1,4 @@ -import { Endpoint } from '@ndn/endpoint'; +import type { Forwarder } from '@ndn/fw'; import type { Data, Interest, Verifier } from '@ndn/packet'; import * as namePattern from './name-pattern.ts'; import * as schemaTree from './schema-tree.ts'; @@ -6,8 +6,8 @@ import { EventChain } from '../utils/event-chain.ts'; import { NamespaceHandler } from './nt-schema.ts'; export interface BaseNodeEvents { - attach(path: namePattern.Pattern, endpoint: Endpoint): Promise; - detach(endpoint: Endpoint): Promise; + attach(path: namePattern.Pattern, fw: Forwarder): Promise; + detach(fw: Forwarder): Promise; } export class BaseNode { @@ -58,11 +58,11 @@ export class BaseNode { public async processAttach(path: namePattern.Pattern, handler: NamespaceHandler) { // All children's attach events are called this.handler = handler; - await this.onAttach.emit(path, handler.endpoint!); + await this.onAttach.emit(path, handler.fw!); } public async processDetach() { - await this.onDetach.emit(this.handler!.endpoint!); + await this.onDetach.emit(this.handler!.fw!); this.handler = undefined; // Then call children's detach } diff --git a/src/namespace/expressing-point.ts b/src/namespace/expressing-point.ts index 2fc55f6..30a4800 100644 --- a/src/namespace/expressing-point.ts +++ b/src/namespace/expressing-point.ts @@ -1,4 +1,4 @@ -import { RetxPolicy } from '@ndn/endpoint'; +import { consume, RetxPolicy } from '@ndn/endpoint'; import { Data, Interest, Signer, type Verifier } from '@ndn/packet'; import * as schemaTree from './schema-tree.ts'; import { BaseNode, BaseNodeEvents } from './base-node.ts'; @@ -198,7 +198,9 @@ export class ExpressingPoint extends BaseNode { throw new Error(`Interest surpressed: ${interestName.toString()} @${this.describe}`); } - const data = await this.handler!.endpoint!.consume(interest, { + // TODO: Handle data directly instead of relying on consume. Because of the segmented object. + const data = await consume(interest, { + fw: this.handler!.fw!, // deno-lint-ignore no-explicit-any signal: opts.abortSignal as any, retx: this.config.retx, diff --git a/src/namespace/nt-schema.test.ts b/src/namespace/nt-schema.test.ts index 544502e..fb0bf5c 100644 --- a/src/namespace/nt-schema.test.ts +++ b/src/namespace/nt-schema.test.ts @@ -1,6 +1,6 @@ import { assert } from '../dep.ts'; import { AsyncDisposableStack, name, Responder } from '../utils/mod.ts'; -import { Endpoint } from '@ndn/endpoint'; +import { consume } from '@ndn/endpoint'; import { Data, digestSigning, SigType } from '@ndn/packet'; import { Decoder, Encoder } from '@ndn/tlv'; import { Bridge } from '@ndn/l3face'; @@ -15,8 +15,6 @@ export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(val Deno.test('NtSchema.1 Basic Interest and Data', async () => { using bridge = Bridge.create({}); const { fwA, fwB } = bridge; - const epA = new Endpoint({ fw: fwA }); - const epB = new Endpoint({ fw: fwB }); await using closers = new AsyncDisposableStack(); const appPrefix = name`/prefix`; @@ -44,13 +42,13 @@ Deno.test('NtSchema.1 Basic Interest and Data', async () => { await digestSigning.sign(data3); return data3; }); - await schema.attach(appPrefix, epA); + await schema.attach(appPrefix, fwA); closers.defer(async () => await schema.detach()); // Responder side const storage = new InMemoryStorage(); closers.use(storage); - const responder = new Responder(appPrefix, epB, storage); + const responder = new Responder(appPrefix, fwB, storage); closers.use(responder); const data1 = new Data( name`${appPrefix}/records/8=rec1`, @@ -85,8 +83,9 @@ Deno.test('NtSchema.1 Basic Interest and Data', async () => { assert.assertEquals(recved2.content, b`World.`); // Test NTSchema's producing data (on request, without storage) - const recved3 = await epB.consume(name`${appPrefix}/records/8=rec3`, { + const recved3 = await consume(name`${appPrefix}/records/8=rec3`, { verifier: digestSigning, + fw: fwB, }); assert.assertExists(recved3); assert.assert(recved3.name.equals(name`${appPrefix}/records/8=rec3`)); @@ -97,8 +96,6 @@ Deno.test('NtSchema.1 Basic Interest and Data', async () => { Deno.test('NtSchema.2 Data Storage', async () => { using bridge = Bridge.create({}); const { fwA, fwB } = bridge; - const epA = new Endpoint({ fw: fwA }); - const epB = new Endpoint({ fw: fwB }); await using closers = new AsyncDisposableStack(); const appPrefix = name`/prefix`; @@ -127,13 +124,13 @@ Deno.test('NtSchema.2 Data Storage', async () => { return undefined; } }); - await schema.attach(appPrefix, epA); + await schema.attach(appPrefix, fwA); closers.defer(async () => await schema.detach()); // Responder side const storageB = new InMemoryStorage(); closers.use(storageB); - const responder = new Responder(appPrefix, epB, storageB); + const responder = new Responder(appPrefix, fwB, storageB); closers.use(responder); const data1 = new Data( name`${appPrefix}/records/8=rec1`, @@ -149,8 +146,9 @@ Deno.test('NtSchema.2 Data Storage', async () => { 'provide', b`World.`, ); - const received = await epB.consume(name`${appPrefix}/records/8=rec2`, { + const received = await consume(name`${appPrefix}/records/8=rec2`, { verifier: digestSigning, + fw: fwB, }); assert.assertExists(received); assert.assert(received.name.equals(name`${appPrefix}/records/8=rec2`)); @@ -178,8 +176,6 @@ Deno.test('NtSchema.2 Data Storage', async () => { Deno.test('NtSchema.3 Verification', async () => { using bridge = Bridge.create({}); const { fwA, fwB } = bridge; - const epA = new Endpoint({ fw: fwA }); - const epB = new Endpoint({ fw: fwB }); await using closers = new AsyncDisposableStack(); const appPrefix = name`/prefix`; const [prvKey, pubKey] = await generateSigningKey(/*identity*/ appPrefix, Ed25519); @@ -216,13 +212,13 @@ Deno.test('NtSchema.3 Verification', async () => { return prevResult; } }); - await schema.attach(appPrefix, epA); + await schema.attach(appPrefix, fwA); closers.defer(async () => await schema.detach()); // Responder side const storage = new InMemoryStorage(); closers.use(storage); - const responder = new Responder(appPrefix, epB, storage); + const responder = new Responder(appPrefix, fwB, storage); closers.use(responder); const data1 = new Data( name`${appPrefix}/records/8=rec1`, diff --git a/src/namespace/nt-schema.ts b/src/namespace/nt-schema.ts index 6ba33f9..e6626cc 100644 --- a/src/namespace/nt-schema.ts +++ b/src/namespace/nt-schema.ts @@ -1,4 +1,5 @@ -import { Endpoint, Producer } from '@ndn/endpoint'; +import { produce, Producer } from '@ndn/endpoint'; +import type { Forwarder } from '@ndn/fw'; import { Data, Interest, Name, type Verifier } from '@ndn/packet'; import * as namePattern from './name-pattern.ts'; import * as schemaTree from './schema-tree.ts'; @@ -14,7 +15,7 @@ export enum VerifyResult { } export interface NamespaceHandler { - get endpoint(): Endpoint | undefined; + get fw(): Forwarder | undefined; get attachedPrefix(): Name | undefined; getVerifier(deadline: number | undefined, verificationContext?: Record): Verifier; storeData(data: Data): Promise; @@ -22,12 +23,12 @@ export interface NamespaceHandler { export class NtSchema implements NamespaceHandler, AsyncDisposable { public readonly tree = schemaTree.create(); - protected _endpoint: Endpoint | undefined; + protected _fw: Forwarder | undefined; protected _attachedPrefix: Name | undefined; protected _producer: Producer | undefined; - get endpoint() { - return this._endpoint; + get fw() { + return this._fw; } get attachedPrefix() { @@ -71,17 +72,24 @@ export class NtSchema implements NamespaceHandler, AsyncDisposable { return undefined; } - public async attach(prefix: Name, endpoint: Endpoint) { + public async attach(prefix: Name, fw: Forwarder) { + if (this._fw !== undefined) { + if (this._fw !== fw) { + throw new Error('You cannot attach a running NTSchema to another forwarder'); + } + return; + } this._attachedPrefix = prefix; - this._endpoint = endpoint; + this._fw = fw; await schemaTree.traverse(this.tree, { post: async (node, path) => await node.resource?.processAttach(path, this), }); - this._producer = endpoint.produce(prefix, this.onInterest.bind(this), { + this._producer = produce(prefix, this.onInterest.bind(this), { describe: `NtSchema[${prefix.toString()}]`, routeCapture: false, announcement: prefix, + fw: fw, }); } @@ -90,7 +98,7 @@ export class NtSchema implements NamespaceHandler, AsyncDisposable { await schemaTree.traverse(this.tree, { pre: async (node) => await node.resource?.processDetach(), }); - this._endpoint = undefined; + this._fw = undefined; this._attachedPrefix = undefined; } diff --git a/src/namespace/segmented-object/fetcher.test.ts b/src/namespace/segmented-object/fetcher.test.ts index 7cab498..b4e1d8d 100644 --- a/src/namespace/segmented-object/fetcher.test.ts +++ b/src/namespace/segmented-object/fetcher.test.ts @@ -1,6 +1,5 @@ import { assert } from '../../dep.ts'; import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; -import { Endpoint } from '@ndn/endpoint'; import { Data, digestSigning } from '@ndn/packet'; import { Encoder } from '@ndn/tlv'; import { Bridge } from '@ndn/l3face'; @@ -14,8 +13,6 @@ export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(val Deno.test('Fetcher.1 Basic fetching', async () => { using bridge = Bridge.create({}); const { fwA, fwB } = bridge; - const epA = new Endpoint({ fw: fwA }); - const epB = new Endpoint({ fw: fwB }); await using closers = new AsyncDisposableStack(); const appPrefix = name`/prefix`; @@ -30,13 +27,13 @@ Deno.test('Fetcher.1 Basic fetching', async () => { return VerifyResult.Fail; } }); - await schema.attach(appPrefix, epA); + await schema.attach(appPrefix, fwA); closers.defer(async () => await schema.detach()); // Responder side const storage = new InMemoryStorage(); closers.use(storage); - const responder = new Responder(appPrefix, epB, storage); + const responder = new Responder(appPrefix, fwB, storage); closers.use(responder); const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; for (const [i, p] of payloads.entries()) { diff --git a/src/namespace/segmented-object/segmented-object.test.ts b/src/namespace/segmented-object/segmented-object.test.ts index 6814919..e7ff87a 100644 --- a/src/namespace/segmented-object/segmented-object.test.ts +++ b/src/namespace/segmented-object/segmented-object.test.ts @@ -1,6 +1,5 @@ import { assert } from '../../dep.ts'; import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; -import { Endpoint } from '@ndn/endpoint'; import { Data, digestSigning } from '@ndn/packet'; import { Decoder, Encoder } from '@ndn/tlv'; import { Bridge } from '@ndn/l3face'; @@ -16,8 +15,6 @@ export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(val Deno.test('SegmentedObject.1 Basic fetching', async () => { using bridge = Bridge.create({}); const { fwA, fwB } = bridge; - const epA = new Endpoint({ fw: fwA }); - const epB = new Endpoint({ fw: fwB }); await using closers = new AsyncDisposableStack(); const appPrefix = name`/prefix`; @@ -36,13 +33,13 @@ Deno.test('SegmentedObject.1 Basic fetching', async () => { leafNode, lifetimeAfterRto: 100, }); - await schema.attach(appPrefix, epA); + await schema.attach(appPrefix, fwA); closers.defer(async () => await schema.detach()); // Responder side const storage = new InMemoryStorage(); closers.use(storage); - const responder = new Responder(appPrefix, epB, storage); + const responder = new Responder(appPrefix, fwB, storage); closers.use(responder); const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; for (const [i, p] of payloads.entries()) { @@ -70,7 +67,6 @@ Deno.test('SegmentedObject.1 Basic fetching', async () => { Deno.test('SegmentedObject.2 Basic provide', async () => { using bridge = Bridge.create({}); const { fwA, fwB } = bridge; - const epA = new Endpoint({ fw: fwA }); await using closers = new AsyncDisposableStack(); const appPrefix = name`/prefix`; @@ -102,7 +98,7 @@ Deno.test('SegmentedObject.2 Basic provide', async () => { leafNode, lifetimeAfterRto: 100, }); - await schema.attach(appPrefix, epA); + await schema.attach(appPrefix, fwA); closers.defer(async () => await schema.detach()); // Provide object diff --git a/src/namespace/sync/sync.ts b/src/namespace/sync/sync.ts index 948b143..c8f9e89 100644 --- a/src/namespace/sync/sync.ts +++ b/src/namespace/sync/sync.ts @@ -63,7 +63,7 @@ export class SvsInstNode extends BaseNode { const describe = this.describe ? `${this.describe}(${matched.name.toString()})` : undefined; const ret = await SvSync.create({ - endpoint: this.handler!.endpoint!, + fw: this.handler!.fw!, syncPrefix: matched.name, signer: signer, verifier: verifier, diff --git a/src/security/cert-storage.test.ts b/src/security/cert-storage.test.ts index c2d14cd..9b1b83e 100644 --- a/src/security/cert-storage.test.ts +++ b/src/security/cert-storage.test.ts @@ -1,16 +1,8 @@ -import { Endpoint } from '@ndn/endpoint'; import { Encoder } from '@ndn/tlv'; import { Forwarder } from '@ndn/fw'; -import { Data, Interest } from '@ndn/packet'; -import { - Certificate, - CertNaming, - createSigner, - createVerifier, - ECDSA, - generateSigningKey, - ValidityPeriod, -} from '@ndn/keychain'; +import * as endpoint from '@ndn/endpoint'; +import { Data, Interest, ValidityPeriod } from '@ndn/packet'; +import { Certificate, CertNaming, createSigner, createVerifier, ECDSA, generateSigningKey } from '@ndn/keychain'; import { InMemoryStorage } from '../storage/mod.ts'; import { CertStorage } from './cert-storage.ts'; import { assert as assertMod } from '../dep.ts'; @@ -58,7 +50,6 @@ Deno.test('Known certificates', async () => { closers.use(storage); const fwAB = Forwarder.create(); closers.defer(() => fwAB.close()); - const endpoint = new Endpoint({ fw: fwAB }); // const responder = new Responder(appPrefix, endpoint, storage); // closers.use(responder); @@ -66,7 +57,7 @@ Deno.test('Known certificates', async () => { storage.set(ownCert.name.toString(), Encoder.encode(ownCert.data)); storage.set(otherCert.name.toString(), Encoder.encode(otherCert.data)); - const cs = await CertStorage.create(anchor, ownCert, storage, endpoint, new Uint8Array(ownPrvKeyBits), 800); + const cs = await CertStorage.create(anchor, ownCert, storage, fwAB, new Uint8Array(ownPrvKeyBits), 800); // await cs.verifier.verify(anchor.data); // Unable to do so, since it is not signed with a Cert name. await cs.verifier.verify(ownCert.data); @@ -122,8 +113,7 @@ Deno.test('Fetch missing certificate once', async () => { closers.use(storage2); const fwAB = Forwarder.create(); closers.defer(() => fwAB.close()); - const endpoint = new Endpoint({ fw: fwAB }); - const responder = new Responder(appPrefix, endpoint, storage2); + const responder = new Responder(appPrefix, fwAB, storage2); closers.use(responder); storage.set(anchor.name.toString(), Encoder.encode(anchor.data)); @@ -131,7 +121,7 @@ Deno.test('Fetch missing certificate once', async () => { storage2.set(otherCert.name.toString(), Encoder.encode(otherCert.data)); storage2.set(dataToFetch.name.toString(), Encoder.encode(dataToFetch)); - const cs = await CertStorage.create(anchor, ownCert, storage, endpoint, new Uint8Array(ownPrvKeyBits), 800); + const cs = await CertStorage.create(anchor, ownCert, storage, fwAB, new Uint8Array(ownPrvKeyBits), 800); const fetchedData = await endpoint.consume( new Interest( @@ -141,6 +131,7 @@ Deno.test('Fetch missing certificate once', async () => { ), { verifier: cs.verifier, + fw: fwAB, }, ); assertEquals(fetchedData.content, new TextEncoder().encode('Hello World')); @@ -173,12 +164,11 @@ Deno.test('Properly sign packets', async () => { closers.use(storage); const fwAB = Forwarder.create(); closers.defer(() => fwAB.close()); - const endpoint = new Endpoint({ fw: fwAB }); storage.set(anchor.name.toString(), Encoder.encode(anchor.data)); storage.set(ownCert.name.toString(), Encoder.encode(ownCert.data)); - const cs = await CertStorage.create(anchor, ownCert, storage, endpoint, new Uint8Array(ownPrvKeyBits), 800); + const cs = await CertStorage.create(anchor, ownCert, storage, fwAB, new Uint8Array(ownPrvKeyBits), 800); const dataToSign = new Data( name`/${appPrefix}/8=node-0/data`, @@ -237,8 +227,7 @@ Deno.test('Reject unavailable certificate', async () => { closers.use(storage2); const fwAB = Forwarder.create(); closers.defer(() => fwAB.close()); - const endpoint = new Endpoint({ fw: fwAB }); - const responder = new Responder(appPrefix, endpoint, storage2); + const responder = new Responder(appPrefix, fwAB, storage2); closers.use(responder); storage.set(anchor.name.toString(), Encoder.encode(anchor.data)); @@ -246,7 +235,7 @@ Deno.test('Reject unavailable certificate', async () => { // storage2.set(otherCert.name.toString(), Encoder.encode(otherCert.data)); // Certificate is missing - const cs = await CertStorage.create(anchor, ownCert, storage, endpoint, new Uint8Array(ownPrvKeyBits), 800); + const cs = await CertStorage.create(anchor, ownCert, storage, fwAB, new Uint8Array(ownPrvKeyBits), 800); await assertRejects(async () => { await cs.verifier.verify(dataToFetch); }, 'Failed to reject not existing certificates'); @@ -311,8 +300,7 @@ Deno.test('Reject mutual loop', async () => { closers.use(storage2); const fwAB = Forwarder.create(); closers.defer(() => fwAB.close()); - const endpoint = new Endpoint({ fw: fwAB }); - const responder = new Responder(appPrefix, endpoint, storage2); + const responder = new Responder(appPrefix, fwAB, storage2); closers.use(responder); storage.set(anchor.name.toString(), Encoder.encode(anchor.data)); @@ -321,7 +309,7 @@ Deno.test('Reject mutual loop', async () => { storage2.set(otherCert2.name.toString(), Encoder.encode(otherCert2.data)); storage2.set(dataToFetch.name.toString(), Encoder.encode(dataToFetch)); - const cs = await CertStorage.create(anchor, ownCert, storage, endpoint, new Uint8Array(ownPrvKeyBits), 800); + const cs = await CertStorage.create(anchor, ownCert, storage, fwAB, new Uint8Array(ownPrvKeyBits), 800); await assertRejects(async () => { await cs.verifier.verify(dataToFetch); }, 'Failed to reject mutually signed certificates'); @@ -374,15 +362,14 @@ Deno.test('Reject self-signed certificate', async () => { closers.use(storage2); const fwAB = Forwarder.create(); closers.defer(() => fwAB.close()); - const endpoint = new Endpoint({ fw: fwAB }); - const responder = new Responder(appPrefix, endpoint, storage2); + const responder = new Responder(appPrefix, fwAB, storage2); closers.use(responder); storage.set(anchor.name.toString(), Encoder.encode(anchor.data)); storage.set(ownCert.name.toString(), Encoder.encode(ownCert.data)); storage2.set(otherCert.name.toString(), Encoder.encode(otherCert.data)); - const cs = await CertStorage.create(anchor, ownCert, storage, endpoint, new Uint8Array(ownPrvKeyBits), 800); + const cs = await CertStorage.create(anchor, ownCert, storage, fwAB, new Uint8Array(ownPrvKeyBits), 800); await assertRejects(async () => { await cs.verifier.verify(dataToFetch); }, 'Failed to reject self-signed certificates'); diff --git a/src/security/cert-storage.ts b/src/security/cert-storage.ts index 8c6f372..756be9d 100644 --- a/src/security/cert-storage.ts +++ b/src/security/cert-storage.ts @@ -1,7 +1,8 @@ import { Decoder, Encoder } from '@ndn/tlv'; import { Data, Interest, Name, Signer, Verifier } from '@ndn/packet'; import { Certificate, createSigner, createVerifier, ECDSA } from '@ndn/keychain'; -import { Endpoint } from '@ndn/endpoint'; +import * as endpoint from '@ndn/endpoint'; +import type { Forwarder } from '@ndn/fw'; import { Storage } from '../storage/mod.ts'; import { SecurityAgent } from './types.ts'; @@ -18,7 +19,7 @@ export class CertStorage implements SecurityAgent { readonly trustAnchor: Certificate, readonly ownCertificate: Certificate, readonly storage: Storage, - readonly endpoint: Endpoint, + readonly fw: Forwarder, prvKeyBits: Uint8Array, protected readonly interestLifetime = 5000, ) { @@ -64,7 +65,7 @@ export class CertStorage implements SecurityAgent { return undefined; } else { try { - const result = await this.endpoint.consume( + const result = await endpoint.consume( new Interest( keyName, Interest.MustBeFresh, @@ -75,6 +76,7 @@ export class CertStorage implements SecurityAgent { // TODO: Find a better way to handle security verifier: this.localVerifier, retx: 5, + fw: this.fw, }, ); @@ -131,7 +133,7 @@ export class CertStorage implements SecurityAgent { trustAnchor: Certificate, ownCertificate: Certificate, storage: Storage, - endpoint: Endpoint, + fw: Forwarder, prvKeyBits: Uint8Array, interestLifetime = 5000, ) { @@ -139,7 +141,7 @@ export class CertStorage implements SecurityAgent { trustAnchor, ownCertificate, storage, - endpoint, + fw, prvKeyBits, interestLifetime, ); diff --git a/src/storage/deno-kv.ts b/src/storage/deno-kv.ts index 8f4000b..69f67c3 100644 --- a/src/storage/deno-kv.ts +++ b/src/storage/deno-kv.ts @@ -19,6 +19,7 @@ export class DenoKvStorage implements Storage { } async set(key: string, value: Uint8Array | undefined): Promise { + // TODO: There is a size limit await this.kv.set([key], value); } diff --git a/src/sync-agent/deliveries.ts b/src/sync-agent/deliveries.ts index 10e446b..e1011b7 100644 --- a/src/sync-agent/deliveries.ts +++ b/src/sync-agent/deliveries.ts @@ -1,4 +1,5 @@ -import { type Endpoint } from '@ndn/endpoint'; +import * as endpoint from '@ndn/endpoint'; +import { type Forwarder } from '@ndn/fw'; import { StateVector, SvSync, type SyncNode, type SyncUpdate } from '@ndn/svs'; import { Data, digestSigning, Name, Signer, type Verifier } from '@ndn/packet'; import { SequenceNum } from '@ndn/naming-convention2'; @@ -47,7 +48,7 @@ export abstract class SyncDelivery implements AsyncDisposable { // TODO: Use options to configure parameters constructor( readonly nodeId: Name, - readonly endpoint: Endpoint, + readonly fw: Forwarder, readonly syncPrefix: Name, readonly signer: Signer, readonly verifier: Verifier, @@ -66,7 +67,7 @@ export abstract class SyncDelivery implements AsyncDisposable { this._lastTillNow = new StateVector(this.state); SvSync.create({ - endpoint: endpoint, + fw: fw, syncPrefix: syncPrefix, signer: signer, verifier: verifier, @@ -159,7 +160,7 @@ export abstract class SyncDelivery implements AsyncDisposable { this._syncInst.close(); this._syncNode = undefined; const svSync = await SvSync.create({ - endpoint: this.endpoint, + fw: this.fw, syncPrefix: this.syncPrefix, signer: digestSigning, // We can do so because the state has not been set @@ -221,7 +222,7 @@ export abstract class SyncDelivery implements AsyncDisposable { export class AtLeastOnceDelivery extends SyncDelivery { constructor( readonly nodeId: Name, - readonly endpoint: Endpoint, + readonly fw: Forwarder, readonly syncPrefix: Name, readonly signer: Signer, readonly verifier: Verifier, @@ -229,7 +230,7 @@ export class AtLeastOnceDelivery extends SyncDelivery { onUpdatePromise: Promise, protected state?: StateVector, ) { - super(nodeId, endpoint, syncPrefix, signer, verifier, onUpdatePromise, state); + super(nodeId, fw, syncPrefix, signer, verifier, onUpdatePromise, state); } override async handleSyncUpdate(update: SyncUpdate) { @@ -251,7 +252,7 @@ export class AtLeastOnceDelivery extends SyncDelivery { }, ca: new LimitedCwnd(new TcpCubic(), 10), verifier: this.verifier, - endpoint: this.endpoint, + fw: this.fw, // WARN: an abort controller is required! NDNts's fetcher cannot close itself even after // the face is destroyed and there exists no way to send the Interest. // deno-lint-ignore no-explicit-any @@ -353,7 +354,7 @@ export class AtLeastOnceDelivery extends SyncDelivery { static async create( nodeId: Name, - endpoint: Endpoint, + fw: Forwarder, syncPrefix: Name, signer: Signer, verifier: Verifier, @@ -367,7 +368,7 @@ export class AtLeastOnceDelivery extends SyncDelivery { if (encoded) { syncState = parseSyncState(encoded); } - return new AtLeastOnceDelivery(nodeId, endpoint, syncPrefix, signer, verifier, storage, onUpdatePromise, syncState); + return new AtLeastOnceDelivery(nodeId, fw, syncPrefix, signer, verifier, storage, onUpdatePromise, syncState); } override async destroy() { @@ -399,7 +400,7 @@ export class AtLeastOnceDelivery extends SyncDelivery { export class LatestOnlyDelivery extends SyncDelivery { constructor( readonly nodeId: Name, - readonly endpoint: Endpoint, + readonly fw: Forwarder, readonly syncPrefix: Name, readonly signer: Signer, readonly verifier: Verifier, @@ -408,14 +409,14 @@ export class LatestOnlyDelivery extends SyncDelivery { readonly onUpdatePromise: Promise, protected state?: StateVector, ) { - super(nodeId, endpoint, syncPrefix, signer, verifier, onUpdatePromise, state); + super(nodeId, fw, syncPrefix, signer, verifier, onUpdatePromise, state); } override async handleSyncUpdate(update: SyncUpdate) { const prefix = getNamespace().baseName(update.id, this.syncPrefix); const name = prefix.append(SequenceNum.create(update.hiSeqNum)); try { - const data = await this.endpoint.consume(name, { verifier: this.verifier }); + const data = await endpoint.consume(name, { verifier: this.verifier, fw: this.fw }); // Update the storage // Note that this will overwrite old data @@ -460,7 +461,7 @@ export class LatestOnlyDelivery extends SyncDelivery { static async create( nodeId: Name, - endpoint: Endpoint, + fw: Forwarder, syncPrefix: Name, signer: Signer, verifier: Verifier, @@ -478,7 +479,7 @@ export class LatestOnlyDelivery extends SyncDelivery { } return new LatestOnlyDelivery( nodeId, - endpoint, + fw, syncPrefix, signer, verifier, diff --git a/src/sync-agent/delivery-alo.test.ts b/src/sync-agent/delivery-alo.test.ts index e663c2c..3b9314c 100644 --- a/src/sync-agent/delivery-alo.test.ts +++ b/src/sync-agent/delivery-alo.test.ts @@ -1,4 +1,3 @@ -import { Endpoint } from '@ndn/endpoint'; import { Forwarder } from '@ndn/fw'; import { Data, digestSigning, Name, type Signer, type Verifier } from '@ndn/packet'; import { GenericNumber } from '@ndn/naming-convention2'; @@ -17,7 +16,7 @@ type SyncUpdateEvent = { class DeliveryTester implements AsyncDisposable { readonly #closers = new AsyncDisposableStack(); - readonly endpoint: Endpoint; + readonly fwAB: Forwarder; readonly syncPrefix = name`/test/32=alo`; readonly stores; readonly alos = [] as AtLeastOnceDelivery[]; @@ -27,16 +26,15 @@ class DeliveryTester implements AsyncDisposable { readonly svsCount: number, readonly updateEvent?: (evt: SyncUpdateEvent, inst: DeliveryTester) => Promise, ) { - const fwAB = Forwarder.create(); - this.endpoint = new Endpoint({ fw: fwAB }); + this.fwAB = Forwarder.create(); this.#closers.defer(() => { - fwAB.close(); + this.fwAB.close(); }); this.stores = Array.from({ length: svsCount }, (_, i) => { const store = new InMemoryStorage(); this.#closers.use(store); - const responder = new Responder(name`/test/32=node/${i}`, this.endpoint, store); + const responder = new Responder(name`/test/32=node/${i}`, this.fwAB, store); this.#closers.use(responder); return store; }); @@ -46,7 +44,7 @@ class DeliveryTester implements AsyncDisposable { for (let i = 0; this.svsCount > i; i++) { const alo = await AtLeastOnceDelivery.create( name`/test/32=node/${i}`, - this.endpoint, + this.fwAB, this.syncPrefix, signer, verifier, @@ -382,7 +380,7 @@ Deno.test('Alo.3 Recover after shutdown', async () => { // Restart alo 0. It is supposed to deliver 'C' again. tester.alos[0] = await AtLeastOnceDelivery.create( name`/test/32=node/${0}`, - tester.endpoint, + tester.fwAB, tester.syncPrefix, digestSigning, digestSigning, diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index 95b146e..b04bd49 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -1,4 +1,5 @@ -import { Endpoint } from '@ndn/endpoint'; +import * as endpoint from '@ndn/endpoint'; +import type { Forwarder } from '@ndn/fw'; import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet'; import { Decoder, Encoder } from '@ndn/tlv'; import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object'; @@ -23,7 +24,7 @@ export class SyncAgent implements AsyncDisposable { readonly appPrefix: Name, readonly persistStorage: Storage, readonly tempStorage: Storage, - readonly endpoint: Endpoint, + readonly fw: Forwarder, readonly signer: Signer, readonly verifier: Verifier, readonly atLeastOnce: AtLeastOnceDelivery, @@ -43,6 +44,7 @@ export class SyncAgent implements AsyncDisposable { describe: 'SyncAgent.serve', routeCapture: false, announcement: appPrefix, + fw: fw, }); // NodeID should be announced to attract traffic. // Design decision: suppose node A, B and C connect to the same network. @@ -53,6 +55,7 @@ export class SyncAgent implements AsyncDisposable { describe: 'SyncAgent.trafficAttractor', routeCapture: false, announcement: nodeId, + fw: fw, }); } @@ -369,7 +372,7 @@ export class SyncAgent implements AsyncDisposable { static async create( nodeId: Name, persistStorage: Storage, - endpoint: Endpoint, + fw: Forwarder, signer: Signer, verifier: Verifier, onReset?: () => void, @@ -385,7 +388,7 @@ export class SyncAgent implements AsyncDisposable { const onUpdatePromise = new Promise((resolve) => resolver = resolve); const latestOnly = await LatestOnlyDelivery.create( nodeId, - endpoint, + fw, lateSyncPrefix, signer, verifier, @@ -395,7 +398,7 @@ export class SyncAgent implements AsyncDisposable { ); const atLeastOnce = await AtLeastOnceDelivery.create( nodeId, - endpoint, + fw, aloSyncPrefix, signer, verifier, @@ -407,7 +410,7 @@ export class SyncAgent implements AsyncDisposable { appPrefix, persistStorage, tempStorage, - endpoint, + fw, signer, verifier, atLeastOnce, diff --git a/src/utils/responder.ts b/src/utils/responder.ts index 98f425f..b4b173e 100644 --- a/src/utils/responder.ts +++ b/src/utils/responder.ts @@ -1,4 +1,5 @@ -import { type Endpoint } from '@ndn/endpoint'; +import { produce } from '@ndn/endpoint'; +import type { Forwarder } from '@ndn/fw'; import { Data, Interest, Name } from '@ndn/packet'; import { Decoder } from '@ndn/tlv'; import { Storage } from '../storage/mod.ts'; @@ -9,15 +10,16 @@ export class Responder implements Disposable { constructor( public readonly prefix: Name, - public readonly endpoint: Endpoint, + public readonly fw: Forwarder, public readonly store: Storage, ) { - this.producer = endpoint.produce(prefix, (interest) => { + this.producer = produce(prefix, (interest) => { return this.serve(interest); }, { describe: `Responder[${prefix.toString()}]`, routeCapture: false, announcement: prefix, + fw: fw, }); } diff --git a/src/workspace/workspace.ts b/src/workspace/workspace.ts index afafb61..64998da 100644 --- a/src/workspace/workspace.ts +++ b/src/workspace/workspace.ts @@ -1,5 +1,5 @@ import { Storage } from '../storage/mod.ts'; -import { Endpoint } from '@ndn/endpoint'; +import type { Forwarder } from '@ndn/fw'; import type { Name, Signer, Verifier } from '@ndn/packet'; import { encodeSyncState, parseSyncState, SyncAgent } from '../sync-agent/mod.ts'; import { NdnSvsAdaptor, YjsStateManager } from '../adaptors/mod.ts'; @@ -9,7 +9,7 @@ export class Workspace implements AsyncDisposable { private constructor( public readonly nodeId: Name, public readonly persistStore: Storage, - public readonly endpoint: Endpoint, + public readonly fw: Forwarder, public readonly onReset: (() => void) | undefined, public readonly syncAgent: SyncAgent, public readonly yjsSnapshotMgr: YjsStateManager, @@ -20,7 +20,7 @@ export class Workspace implements AsyncDisposable { public static async create(opts: { nodeId: Name; persistStore: Storage; - endpoint: Endpoint; + fw: Forwarder; rootDoc: Y.Doc; signer: Signer; verifier: Verifier; @@ -40,7 +40,7 @@ export class Workspace implements AsyncDisposable { const syncAgent = await SyncAgent.create( opts.nodeId, opts.persistStore, - opts.endpoint, + opts.fw, opts.signer, opts.verifier, opts.onReset, @@ -73,7 +73,7 @@ export class Workspace implements AsyncDisposable { return new Workspace( opts.nodeId, opts.persistStore, - opts.endpoint, + opts.fw, opts.onReset, syncAgent, yjsSnapshotMgr,