From 86a7a0a88e21c20b944cbf7aa4a60a01ed892300 Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Fri, 23 Feb 2024 04:38:19 -0800 Subject: [PATCH] Add NTSchema's SegmentedObject --- deno.lock | 2 + package.json | 2 + pnpm-lock.yaml | 8 +- src/namespace/base-node.ts | 4 + src/namespace/expressing-point.ts | 17 +- src/namespace/leaf-node.ts | 23 +- src/namespace/mod.ts | 7 + src/namespace/name-pattern.ts | 2 +- src/namespace/nt-schema.test.ts | 38 +--- src/namespace/nt-schema.ts | 17 +- .../segmented-object/fetcher.test.ts | 74 +++++++ src/namespace/segmented-object/fetcher.ts | 197 +++++++++++++++++ .../segmented-object/segmented-object.test.ts | 134 ++++++++++++ .../segmented-object/segmented-object.ts | 206 ++++++++++++++++++ src/utils/mod.ts | 1 + 15 files changed, 688 insertions(+), 44 deletions(-) create mode 100644 src/namespace/segmented-object/fetcher.test.ts create mode 100644 src/namespace/segmented-object/fetcher.ts create mode 100644 src/namespace/segmented-object/segmented-object.test.ts create mode 100644 src/namespace/segmented-object/segmented-object.ts diff --git a/deno.lock b/deno.lock index b5720e3..8e57a11 100644 --- a/deno.lock +++ b/deno.lock @@ -197,12 +197,14 @@ "dependencies": [ "npm:@types/wicg-file-system-access@^2023.10.4", "npm:abortable-iterator@^5.0.1", + "npm:event-iterator@^2.0.0", "npm:eventemitter3@^5.0.1", "npm:it-pushable@^3.2.3", "npm:jose@^5.2.1", "npm:p-defer@^4.0.0", "npm:streaming-iterables@^8.0.1", "npm:tslib@^2.6.2", + "npm:type-fest@^4.10.2", "npm:y-protocols@^1.0.6", "npm:yjs@^13.6.12" ] diff --git a/package.json b/package.json index d557f80..972528f 100644 --- a/package.json +++ b/package.json @@ -19,9 +19,11 @@ "url": "https://github.com/UCLA-IRL/ndnts-aux/issues" }, "dependencies": { + "event-iterator": "^2.0.0", "eventemitter3": "^5.0.1", "jose": "^5.2.1", "tslib": "^2.6.2", + "type-fest": "^4.10.2", "y-protocols": "^1.0.6", "yjs": "^13.6.12" }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e6e424c..4c7aaf8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -5,6 +5,9 @@ settings: excludeLinksFromLockfile: false dependencies: + event-iterator: + specifier: ^2.0.0 + version: 2.0.0 eventemitter3: specifier: ^5.0.1 version: 5.0.1 @@ -14,6 +17,9 @@ dependencies: tslib: specifier: ^2.6.2 version: 2.6.2 + type-fest: + specifier: ^4.10.2 + version: 4.10.2 y-protocols: specifier: ^1.0.6 version: 1.0.6(yjs@13.6.12) @@ -266,7 +272,6 @@ packages: /event-iterator@2.0.0: resolution: {integrity: sha512-KGft0ldl31BZVV//jj+IAIGCxkvvUkkON+ScH6zfoX+l+omX6001ggyRSpI0Io2Hlro0ThXotswCtfzS8UkIiQ==} - dev: true /eventemitter3@5.0.1: resolution: {integrity: sha512-GWkBvjiSZK87ELrYOSESUYeVIc9mvLLf/nXalMOS5dYrgZq9o5OVkbZAVM06CVxYsCwH9BDZFPlQTlPA1j4ahA==} @@ -661,7 +666,6 @@ packages: /type-fest@4.10.2: resolution: {integrity: sha512-anpAG63wSpdEbLwOqH8L84urkL6PiVIov3EMmgIhhThevh9aiMQov+6Btx0wldNcvm4wV+e2/Rt1QdDwKHFbHw==} engines: {node: '>=16'} - dev: true /typescript-event-target@1.1.0: resolution: {integrity: sha512-PMrzUVryhnUq2n8M7tjNHNRuIHlUqly5RfGltBTpPCdVpbytgALTRDegF/t6+mFmmtBVhOqEYlbjVNBxwabIug==} diff --git a/src/namespace/base-node.ts b/src/namespace/base-node.ts index 5800af4..bac600d 100644 --- a/src/namespace/base-node.ts +++ b/src/namespace/base-node.ts @@ -15,6 +15,10 @@ export class BaseNode { public readonly onDetach = new EventChain(); protected handler: NamespaceHandler | undefined = undefined; + constructor(public readonly describe?: string) { + this.describe ??= this.constructor.name; + } + public get namespaceHandler() { return this.handler; } diff --git a/src/namespace/expressing-point.ts b/src/namespace/expressing-point.ts index 45aeea7..c3fd70a 100644 --- a/src/namespace/expressing-point.ts +++ b/src/namespace/expressing-point.ts @@ -33,7 +33,7 @@ export interface ExpressingPointEvents extends BaseNodeEvents { } export type ExpressingPointOpts = { - lifetimeMs: number; + lifetimeMs?: number; interestSigner?: Signer; canBePrefix?: boolean; mustBeFresh?: boolean; @@ -53,8 +53,9 @@ export class ExpressingPoint extends BaseNode { constructor( public readonly config: ExpressingPointOpts, + describe?: string, ) { - super(); + super(describe); } public searchCache(target: schemaTree.StrictMatch, interest: Interest, deadline: number) { @@ -134,8 +135,9 @@ export class ExpressingPoint extends BaseNode { signer?: Signer; lifetimeMs?: number; deadline?: number; + verifier?: Verifier; } = {}, - ): Promise { + ): Promise { // Construct Interest, but without signing, so the parameter digest is not there const interestName = this.handler!.attachedPrefix!.append(...matched.name.comps); const interestArgs = [interestName] as Array; @@ -156,6 +158,11 @@ export class ExpressingPoint extends BaseNode { } // TODO: FwHint is not supported for now. Who should provide this info? const lifetimeMs = opts.lifetimeMs ?? this.config.lifetimeMs; + if (lifetimeMs === undefined) { + throw new Error( + `[${this.describe}:need] Unable to generate Interest when lifetimeMs is missing in config.`, + ); + } interestArgs.push(Interest.Lifetime(lifetimeMs)); const interest = new Interest(...interestArgs); @@ -185,7 +192,7 @@ export class ExpressingPoint extends BaseNode { // Express the Interest if not surpressed const supressInterest = opts.supressInterest ?? this.config.supressInterest; if (supressInterest) { - return undefined; + throw new Error(`Interest surpressed: ${interestName.toString()} @${this.describe}`); } const data = await this.handler!.endpoint!.consume(interest, { @@ -193,7 +200,7 @@ export class ExpressingPoint extends BaseNode { signal: opts.abortSignal as any, retx: this.config.retx, // Note: the verifier is at the LeafNode if CanBePrefix is set - verifier: this.handler!.getVerifier(deadline), + verifier: opts.verifier ?? this.handler!.getVerifier(deadline), }); // (no await) Save (cache) the data in the storage diff --git a/src/namespace/leaf-node.ts b/src/namespace/leaf-node.ts index f7ba415..446c4ec 100644 --- a/src/namespace/leaf-node.ts +++ b/src/namespace/leaf-node.ts @@ -17,8 +17,8 @@ export interface LeafNodeEvents extends ExpressingPointEvents { } export type LeafNodeOpts = ExpressingPointOpts & { - freshnessMs: number; - signer: Signer; + freshnessMs?: number; + signer?: Signer; validityMs?: number; contentType?: number; }; @@ -29,8 +29,9 @@ export class LeafNode extends ExpressingPoint { constructor( public readonly config: LeafNodeOpts, + describe?: string, ) { - super(config); + super(config, describe); } public override async storeData( @@ -58,21 +59,29 @@ export class LeafNode extends ExpressingPoint { signer?: Signer; finalBlockId?: Component; } = {}, - ): Promise { + ): Promise { const payload = content instanceof Uint8Array ? content : new TextEncoder().encode(content); + const signer = opts.signer ?? this.config.signer; + const freshnessMs = opts.freshnessMs ?? this.config.freshnessMs; + if (!signer || freshnessMs === undefined) { + throw new Error( + `[${this.describe}:provide] Unable to generate Data when signer or freshnessMs is missing in config.`, + ); + } + // Create Data const dataName = this.handler!.attachedPrefix!.append(...matched.name.comps); const data = new Data( dataName, Data.ContentType(this.config.contentType ?? 0), // Default is BLOB - Data.FreshnessPeriod(opts.freshnessMs ?? this.config.freshnessMs), + Data.FreshnessPeriod(freshnessMs), payload, ); if (opts.finalBlockId) { data.finalBlockId = opts.finalBlockId; } - await this.config.signer.sign(data); + await signer.sign(data); const wire = Encoder.encode(data); const validity = this.config.validityMs ?? 876000 * 3600000; @@ -86,6 +95,6 @@ export class LeafNode extends ExpressingPoint { validUntil, }); - return wire; + return data; } } diff --git a/src/namespace/mod.ts b/src/namespace/mod.ts index e69de29..a3ca918 100644 --- a/src/namespace/mod.ts +++ b/src/namespace/mod.ts @@ -0,0 +1,7 @@ +export * as Pattern from './name-pattern.ts'; +export * as Tree from './schema-tree.ts'; +export { pattern } from './name-pattern.ts'; +export * from './nt-schema.ts'; +export * from './base-node.ts'; +export * from './expressing-point.ts'; +export * from './leaf-node.ts'; diff --git a/src/namespace/name-pattern.ts b/src/namespace/name-pattern.ts index 83f1e8e..4639ce2 100644 --- a/src/namespace/name-pattern.ts +++ b/src/namespace/name-pattern.ts @@ -95,7 +95,7 @@ export const makeStep = ( return pattern; } else { const value = mapping[pattern.tag]; - if (!value) { + if (value === undefined) { throw new Error(`The pattern variable ${pattern.tag} does not exist in the mapping.`); } const v = typeof value === 'number' ? Encoder.encode(NNI(value)) : value; diff --git a/src/namespace/nt-schema.test.ts b/src/namespace/nt-schema.test.ts index da9f50c..544502e 100644 --- a/src/namespace/nt-schema.test.ts +++ b/src/namespace/nt-schema.test.ts @@ -8,11 +8,8 @@ import { Ed25519, generateSigningKey } from '@ndn/keychain'; import { NtSchema, VerifyResult } from './nt-schema.ts'; import { InMemoryStorage } from '../storage/mod.ts'; import { LeafNode } from './leaf-node.ts'; -import * as namePattern from './name-pattern.ts'; import * as Tree from './schema-tree.ts'; -import { BaseNode } from './base-node.ts'; -const { pattern } = namePattern; export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); Deno.test('NtSchema.1 Basic Interest and Data', async () => { @@ -25,17 +22,12 @@ Deno.test('NtSchema.1 Basic Interest and Data', async () => { // NTSchema side const schema = new NtSchema(); - const leaf = new LeafNode({ + const leafNode = schema.set('/records/<8=recordId:string>', LeafNode, { lifetimeMs: 100, freshnessMs: 60000, signer: digestSigning, }); - const leafNode = Tree.touch( - schema.tree, - pattern`/records/<8=recordId:string>`, - leaf, - ); - leaf.onVerify.addListener(async ({ pkt }) => { + leafNode.resource!.onVerify.addListener(async ({ pkt }) => { try { await digestSigning.verify(pkt); return VerifyResult.Pass; @@ -43,7 +35,7 @@ Deno.test('NtSchema.1 Basic Interest and Data', async () => { return VerifyResult.Fail; } }); - leaf.onInterest.addListener(async () => { + leafNode.resource!.onInterest.addListener(async () => { const data3 = new Data( name`${appPrefix}/records/8=rec3`, Data.FreshnessPeriod(60000), @@ -113,17 +105,12 @@ Deno.test('NtSchema.2 Data Storage', async () => { // NTSchema side const schema = new NtSchema(); const storageA = new InMemoryStorage(); - const leaf = new LeafNode({ + const leafNode = schema.set('/records/<8=recordId:string>', LeafNode, { lifetimeMs: 100, freshnessMs: 60000, signer: digestSigning, }); - const leafNode = Tree.touch( - schema.tree, - pattern`/records/<8=recordId:string>`, - leaf, - ); - leaf.onVerify.addListener(async ({ pkt }) => { + leafNode.resource!.onVerify.addListener(async ({ pkt }) => { try { await digestSigning.verify(pkt); return VerifyResult.Pass; @@ -131,8 +118,8 @@ Deno.test('NtSchema.2 Data Storage', async () => { return VerifyResult.Fail; } }); - leaf.onSaveStorage.addListener(({ data, wire }) => storageA.set(data.name.toString(), wire)); - leaf.onSearchStorage.addListener(async ({ interest }) => { + leafNode.resource!.onSaveStorage.addListener(({ data, wire }) => storageA.set(data.name.toString(), wire)); + leafNode.resource!.onSearchStorage.addListener(async ({ interest }) => { const wire = await storageA.get(interest.name.toString()); if (wire) { return Decoder.decode(wire, Data); @@ -200,17 +187,12 @@ Deno.test('NtSchema.3 Verification', async () => { // NTSchema side const schema = new NtSchema(); - const leaf = new LeafNode({ + const leafNode = schema.set('/records/<8=recordId:string>', LeafNode, { lifetimeMs: 100, freshnessMs: 60000, signer: digestSigning, }); - const leafNode = Tree.touch( - schema.tree, - pattern`/records/<8=recordId:string>`, - leaf, - ); - leaf.onVerify.addListener(async ({ pkt, prevResult }) => { + leafNode.resource!.onVerify.addListener(async ({ pkt, prevResult }) => { if (pkt.sigInfo?.type === SigType.Sha256) { try { await digestSigning.verify(pkt); @@ -222,7 +204,7 @@ Deno.test('NtSchema.3 Verification', async () => { return prevResult; } }); - leaf.onVerify.addListener(async ({ pkt, prevResult }) => { + leafNode.resource!.onVerify.addListener(async ({ pkt, prevResult }) => { if (pkt.sigInfo?.type === SigType.Ed25519) { try { await pubKey.verify(pkt); diff --git a/src/namespace/nt-schema.ts b/src/namespace/nt-schema.ts index 04dfefd..cb5d47f 100644 --- a/src/namespace/nt-schema.ts +++ b/src/namespace/nt-schema.ts @@ -1,6 +1,6 @@ import { Endpoint, Producer } from '@ndn/endpoint'; import { Data, Interest, Name, type Verifier } from '@ndn/packet'; -// import * as namePattern from './name-pattern.ts'; +import * as namePattern from './name-pattern.ts'; import * as schemaTree from './schema-tree.ts'; import { type BaseNode } from './base-node.ts'; @@ -94,6 +94,21 @@ export class NtSchema implements NamespaceHandler, AsyncDisposable { this._attachedPrefix = undefined; } + public set, T extends BaseNode>( + path: string | namePattern.Pattern, + klass: new (...args: Args) => T, + ...args: Args + ): schemaTree.Node { + if (typeof path === 'string') { + path = namePattern.fromString(path); + } + return schemaTree.touch( + this.tree, + path, + new klass(...args), + ); + } + async [Symbol.asyncDispose]() { if (this._producer) { await this.detach(); diff --git a/src/namespace/segmented-object/fetcher.test.ts b/src/namespace/segmented-object/fetcher.test.ts new file mode 100644 index 0000000..7cab498 --- /dev/null +++ b/src/namespace/segmented-object/fetcher.test.ts @@ -0,0 +1,74 @@ +import { assert } from '../../dep.ts'; +import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; +import { Endpoint } from '@ndn/endpoint'; +import { Data, digestSigning } from '@ndn/packet'; +import { Encoder } from '@ndn/tlv'; +import { Bridge } from '@ndn/l3face'; +import { NtSchema, VerifyResult } from '../nt-schema.ts'; +import { InMemoryStorage } from '../../storage/mod.ts'; +import { LeafNode } from '../leaf-node.ts'; +import { Fetcher } from './fetcher.ts'; + +export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); + +Deno.test('Fetcher.1 Basic fetching', async () => { + using bridge = Bridge.create({}); + const { fwA, fwB } = bridge; + const epA = new Endpoint({ fw: fwA }); + const epB = new Endpoint({ fw: fwB }); + await using closers = new AsyncDisposableStack(); + const appPrefix = name`/prefix`; + + // NTSchema side + const schema = new NtSchema(); + const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, {}); + leafNode.resource!.onVerify.addListener(async ({ pkt }) => { + try { + await digestSigning.verify(pkt); + return VerifyResult.Pass; + } catch { + return VerifyResult.Fail; + } + }); + await schema.attach(appPrefix, epA); + closers.defer(async () => await schema.detach()); + + // Responder side + const storage = new InMemoryStorage(); + closers.use(storage); + const responder = new Responder(appPrefix, epB, storage); + closers.use(responder); + const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; + for (const [i, p] of payloads.entries()) { + const data = new Data( + name`${appPrefix}/object/50=${i}`, + Data.FreshnessPeriod(1000), + p, + ); + data.finalBlockId = name`50=${payloads.length - 1}`.get(0); + await digestSigning.sign(data); + await storage.set(data.name.toString(), Encoder.encode(data)); + } + + // Fetching Object + const results = payloads.map(() => 'MISSING'); + const fetcher = new Fetcher(leafNode, {}, {}); + fetcher.onSegment.addListener((segNum, data) => { + // results[segNum] = data.content === payloads[segNum] ? 'CORRECT' : 'WRONG'; + assert.assertEquals(data.content, payloads[segNum]); + results[segNum] = 'CORRECT'; + return Promise.resolve(); + }); + const { promise: finishPromise, resolve: resolveFinish } = Promise.withResolvers(); + fetcher.onEnd.addListener(() => { + resolveFinish(); + return Promise.resolve(); + }); + fetcher.onError.addListener((err) => { + throw new Error(`Fetching failed: ${err}`); + }); + fetcher.run(); // Concurrently without await + await finishPromise; + + assert.assertEquals(results, ['CORRECT', 'CORRECT', 'CORRECT']); +}); diff --git a/src/namespace/segmented-object/fetcher.ts b/src/namespace/segmented-object/fetcher.ts new file mode 100644 index 0000000..1940af5 --- /dev/null +++ b/src/namespace/segmented-object/fetcher.ts @@ -0,0 +1,197 @@ +// Modified from @ndn/segmented-object/src/fetch/fetcher.ts +// ISC License +// Copyright (c) 2019-2024, Junxiao Shi +import { Data, NamingConvention, Verifier } from '@ndn/packet'; +import * as Pattern from '../name-pattern.ts'; +import * as Schema from '../schema-tree.ts'; +// @deno-types="@ndn/segmented-object/lib/fetch/logic.d.ts" +import { FetchLogic } from '@ndn/segmented-object/lib/fetch/logic_browser.js'; +import { LeafNode } from '../mod.ts'; +import { NNI } from '@ndn/tlv'; +import { EventChain } from '../../utils/mod.ts'; + +export type SegmentConvention = NamingConvention; +export type PipelineOptions = FetchLogic.Options; + +export interface EventMap { + /** Emitted when a Data segment arrives. Note: it is blocking */ + segment(segNum: number, data: Data): Promise; + /** Emitted after all data chunks arrive. */ + end(): Promise; + /** Emitted upon error. */ + error(err: Error): Promise; +} + +type PipelinePacket = { + segNum: number; + matched: Schema.MatchedObject; + lifetimeMs: number; + toCancel: boolean; + internalId: number; +}; + +/** Fetch Data packets as guided by FetchLogic. */ +export class Fetcher { + #count = 0; + #internalId = 0; + #hasFailed = false; + readonly #intSignals: Map = new Map(); + readonly #mapping: Pattern.Mapping; + readonly #handleAbort; + readonly segmentPattern: string; + readonly verifier: Verifier | undefined; + + public readonly onSegment = new EventChain(); + public readonly onEnd = new EventChain(); + public readonly onError = new EventChain(); + + /** Number of segments retrieved so far. */ + public get count() { + return this.#count; + } + + private readonly logic: FetchLogic; + private readonly signal?: AbortSignal; + private readonly lifetimeAfterRto!: number; + + constructor( + /** The leaf node representing the segments */ + readonly node: Schema.Node, + prefixMapping: Pattern.Mapping, + opts: FetcherOptions, + ) { + this.signal = opts.signal; + this.lifetimeAfterRto = opts.lifetimeAfterRto ?? 1000; + this.segmentPattern = opts.segmentPattern ?? (node.upEdge as Pattern.PatternComponent).tag; + this.verifier = opts.verifier; + this.#mapping = { ...prefixMapping }; + + this.logic = new FetchLogic(opts); + this.logic.addEventListener('end', async () => { + await this.onEnd.emit(); + this.close(); + }); + this.logic.addEventListener('exceedRetxLimit', ({ detail: segNum }) => { + this.fail(new Error(`cannot retrieve segment ${segNum}`)); + }); + + this.#handleAbort = () => { + this.fail(new Error('fetch aborted')); + }; + this.signal?.addEventListener('abort', this.#handleAbort); + } + + public close(): void { + this.signal?.removeEventListener('abort', this.#handleAbort); + this.logic.close(); + } + + /** + * Pause outgoing Interests, for backpressure from Data consumer. + * @returns Function for resuming. + */ + public pause() { + return this.logic.pause(); + } + + public async run() { + const sender = this.logic.outgoing( + ({ segNum, rto }) => + ({ + segNum, + matched: Schema.apply(this.node, { ...this.#mapping, [this.segmentPattern]: segNum }), + lifetimeMs: rto + this.lifetimeAfterRto, + toCancel: false, + internalId: this.#internalId++, + }) satisfies PipelinePacket, + ({ interest }) => ({ ...interest, toCancel: true }) satisfies PipelinePacket, + ) as AsyncGenerator; + + for await (const action of sender) { + if (action.toCancel) { + // TODO: Should use TX at NTSchema to avoid overhead + this.#intSignals.get(action.internalId)?.abort(); + this.#intSignals.delete(action.internalId); + } else { + // Create Interest without waiting + this.shoot(action); + } + } + } + + private async shoot({ segNum, matched, lifetimeMs, internalId }: PipelinePacket) { + const abortCtl = new AbortController(); + const abortSignal = abortCtl.signal; + this.#intSignals.set(internalId, abortCtl); + + try { + const verifier = this.verifier; + const data = await Schema.call(matched, 'need', { abortSignal, lifetimeMs, verifier }); + + const now = this.logic.now(); + this.logic.satisfy(segNum, now, false); // TODO: congestionMark + if (data.isFinalBlock) { + this.logic.setFinalSegNum(segNum); + } else if (data.finalBlockId && data.finalBlockId.type === this.node.upEdge?.type) { + this.logic.setFinalSegNum(NNI.decode(data.finalBlockId.value), true); + } + ++this.#count; + await this.onSegment.emit(segNum, data); + } catch (err) { + if (err instanceof Error && err.message.startsWith('Interest rejected')) { + // Silently ignore timeouts + } else { + // Pass verification error + this.fail(new Error(`failed to fetch segment ${segNum}: ${err}`)); + } + } + } + + private fail(err: Error): void { + if (this.#hasFailed) { + return; + } + this.#hasFailed = true; + setTimeout(async () => { + await this.onError.emit(err); + this.close(); + }, 0); + } +} + +export interface FetcherOptions extends FetchLogic.Options { + /** AbortSignal that allows canceling the Interest via AbortController. */ + signal?: AbortSignal; + + /** + * InterestLifetime added to RTO. + * @defaultValue 1000ms + * + * @remarks + * Ignored if `lifetime` is set. + */ + lifetimeAfterRto?: number; + + /** + * The name of the pattern variable for segment + * @defaultValue parent edge of node + */ + segmentPattern?: string; + + /** + * The verifier to verify received segments. + * By default, the one configured at the LeafNode will be used. + */ + verifier?: Verifier; +} + +export interface SegmentData { + segNum: number; + data: Data; +} + +export class SegmentDataEvent extends Event implements SegmentData { + constructor(type: string, public readonly segNum: number, public readonly data: Data) { + super(type); + } +} diff --git a/src/namespace/segmented-object/segmented-object.test.ts b/src/namespace/segmented-object/segmented-object.test.ts new file mode 100644 index 0000000..b7b2af9 --- /dev/null +++ b/src/namespace/segmented-object/segmented-object.test.ts @@ -0,0 +1,134 @@ +import { assert } from '../../dep.ts'; +import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; +import { Endpoint } from '@ndn/endpoint'; +import { Data, digestSigning } from '@ndn/packet'; +import { Decoder, Encoder } from '@ndn/tlv'; +import { Bridge } from '@ndn/l3face'; +import { BufferChunkSource, fetch } from '@ndn/segmented-object'; +import { NtSchema, VerifyResult } from '../nt-schema.ts'; +import { InMemoryStorage } from '../../storage/mod.ts'; +import { LeafNode } from '../leaf-node.ts'; +import * as Tree from '../schema-tree.ts'; +import { SegmentedObject } from './segmented-object.ts'; + +export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); + +Deno.test('SegmentedObject.1 Basic fetching', async () => { + using bridge = Bridge.create({}); + const { fwA, fwB } = bridge; + const epA = new Endpoint({ fw: fwA }); + const epB = new Endpoint({ fw: fwB }); + await using closers = new AsyncDisposableStack(); + const appPrefix = name`/prefix`; + + // NTSchema side + const schema = new NtSchema(); + const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, {}); + leafNode.resource!.onVerify.addListener(async ({ pkt }) => { + try { + await digestSigning.verify(pkt); + return VerifyResult.Pass; + } catch { + return VerifyResult.Fail; + } + }); + const segObjNode = schema.set('/object', SegmentedObject, { + leafNode, + lifetimeAfterRto: 100, + }); + await schema.attach(appPrefix, epA); + closers.defer(async () => await schema.detach()); + + // Responder side + const storage = new InMemoryStorage(); + closers.use(storage); + const responder = new Responder(appPrefix, epB, storage); + closers.use(responder); + const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; + for (const [i, p] of payloads.entries()) { + const data = new Data( + name`${appPrefix}/object/50=${i}`, + Data.FreshnessPeriod(1000), + p, + ); + data.finalBlockId = name`50=${payloads.length - 1}`.get(0); + await digestSigning.sign(data); + await storage.set(data.name.toString(), Encoder.encode(data)); + } + + // Fetching Object + const results = []; + const matched = Tree.apply(segObjNode, {}); + const needed = Tree.call(matched, 'need'); + for await (const r of needed) { + results.push(r.content); + } + + assert.assertEquals(results, payloads); +}); + +Deno.test('SegmentedObject.2 Basic provide', async () => { + using bridge = Bridge.create({}); + const { fwA, fwB } = bridge; + const epA = new Endpoint({ fw: fwA }); + await using closers = new AsyncDisposableStack(); + const appPrefix = name`/prefix`; + + // NTSchema side + const schema = new NtSchema(); + const storageA = new InMemoryStorage(); + const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, { + freshnessMs: 60000, + signer: digestSigning, + }); + leafNode.resource!.onVerify.addListener(async ({ pkt }) => { + try { + await digestSigning.verify(pkt); + return VerifyResult.Pass; + } catch { + return VerifyResult.Fail; + } + }); + leafNode.resource!.onSaveStorage.addListener(({ data, wire }) => storageA.set(data.name.toString(), wire)); + leafNode.resource!.onSearchStorage.addListener(async ({ interest }) => { + const wire = await storageA.get(interest.name.toString()); + if (wire) { + return Decoder.decode(wire, Data); + } else { + return undefined; + } + }); + const segObjNode = schema.set('/object', SegmentedObject, { + leafNode, + lifetimeAfterRto: 100, + }); + await schema.attach(appPrefix, epA); + closers.defer(async () => await schema.detach()); + + // Provide object + const matched = Tree.apply(segObjNode, {}); + await Tree.call( + matched, + 'provide', + new BufferChunkSource( + b`SEG1 SEG2 SEG3;`, + { chunkSize: 5 }, + ), + ); + + // Fetching Object + const results = []; + const result = fetch(name`${appPrefix}/object/`, { + fw: fwB, + verifier: digestSigning, + modifyInterest: { mustBeFresh: true }, + lifetimeAfterRto: 2000, + // default naming convention is 50= + }); + for await (const segment of result) { + // Reassemble + results.push(segment.content); + } + + assert.assertEquals(results, [b`SEG1 `, b`SEG2 `, b`SEG3;`]); +}); diff --git a/src/namespace/segmented-object/segmented-object.ts b/src/namespace/segmented-object/segmented-object.ts new file mode 100644 index 0000000..6185db5 --- /dev/null +++ b/src/namespace/segmented-object/segmented-object.ts @@ -0,0 +1,206 @@ +import { Component, Data, Signer, Verifier } from '@ndn/packet'; +import { assert, concatBuffers, Reorder } from '@ndn/util'; +import type { ChunkSource } from '@ndn/segmented-object'; +import { Encoder, NNI } from '@ndn/tlv'; +import { collect, map, type WritableStreamish, writeToStream } from 'streaming-iterables'; +import type { Promisable } from 'type-fest'; +import { EventIterator } from 'event-iterator'; +import { BaseNode } from '../base-node.ts'; +import * as Pattern from '../name-pattern.ts'; +import * as Schema from '../schema-tree.ts'; +import { LeafNode } from '../mod.ts'; +import { Fetcher, FetcherOptions, PipelineOptions } from './fetcher.ts'; + +export interface Result extends PromiseLike, AsyncIterable { + /** Iterate over Data packets as they arrive, not sorted in segment number order. */ + unordered: () => AsyncIterable; + + /** Iterate over payload chunks in segment number order. */ + chunks: () => AsyncIterable; + + /** Write all chunks to the destination stream. */ + pipe: (dest: WritableStreamish) => Promise; + + /** Number of segments retrieved so far. */ + readonly count: number; +} + +type SegmentData = { + segNum: number; + data: Data; +}; + +class FetchResult implements Result { + constructor( + private readonly node: Schema.Node, + private readonly prefixMapping: Pattern.Mapping, + private readonly opts: FetcherOptions, + ) {} + + public get count(): number { + return this.ctx?.count ?? 0; + } + private ctx?: Fetcher; + private promise?: Promise; + + private startFetcher() { + assert(!this.ctx, 'FetchResult is already used'); + const ctx = new Fetcher(this.node, this.prefixMapping, this.opts); + this.ctx = ctx; + ctx.run(); + return new EventIterator(({ push, stop, fail, on }) => { + let resume: (() => void) | undefined; + on('highWater', () => { + resume = ctx.pause(); + }); + on('lowWater', () => { + resume?.(); + }); + + const abort = new AbortController(); + ctx.onSegment.addListener((segNum, data) => Promise.resolve(push({ segNum, data }))); + ctx.onEnd.addListener(() => Promise.resolve(stop())); + // ctx.addEventListener('error', ({ detail }) => fail(detail), { signal: abort.signal }); + const errorListener = (err: Error) => Promise.resolve(fail(err)); + ctx.onError.addListener(errorListener); + abort.signal.addEventListener('abort', () => { + ctx.onError.removeListener(errorListener); + }); + return () => { + resume?.(); + abort.abort(); + }; + }); + } + + public unordered() { + return map( + ({ data, segNum }) => Object.assign(data, { segNum }), + this.startFetcher(), + ); + } + + private async *ordered() { + const reorder = new Reorder(this.opts.segmentRange?.[0]); + for await (const { segNum, data } of this.startFetcher()) { + reorder.push(segNum, data); + yield* reorder.shift(); + } + assert(reorder.empty, `${reorder.size} leftover segments`); + } + + public chunks() { + return map((data) => data.content, this.ordered()); + } + + public pipe(dest: WritableStreamish) { + return writeToStream(dest, this.chunks()); + } + + private async startPromise() { + const chunks = await collect(this.chunks()); + return concatBuffers(chunks); + } + + public then( + onfulfilled?: ((value: Uint8Array) => Promisable) | null, + onrejected?: ((reason: unknown) => Promisable) | null, + ) { + this.promise ??= this.startPromise(); + return this.promise.then(onfulfilled, onrejected); + } + + public [Symbol.asyncIterator]() { + return this.ordered()[Symbol.asyncIterator](); + } +} + +export type SegmentedObjectOpts = { + /** The leaf node representing the segments */ + leafNode: Schema.Node; + + /** + * InterestLifetime added to RTO. + * @defaultValue 1000ms + * + * @remarks + * Ignored if `lifetime` is set. + */ + lifetimeAfterRto?: number; + + /** + * The name of the pattern variable for segment + * @defaultValue parent edge of node + */ + segmentPattern?: string; + + /** + * The Component type of the segment + * @defaultValue parent edge of node + */ + segmentType?: number; + + pipelineOpts?: PipelineOptions; +}; + +export class SegmentedObject extends BaseNode { + lifetimeAfterRto: number; + segmentPattern: string; + segmentType: number; + leafNode: Schema.Node; + pipelineOpts: PipelineOptions; + + constructor( + config: SegmentedObjectOpts, + describe?: string, + ) { + super(describe); + this.leafNode = config.leafNode; + this.lifetimeAfterRto = config.lifetimeAfterRto ?? 1000; + this.segmentPattern = config.segmentPattern ?? (this.leafNode.upEdge as Pattern.PatternComponent).tag; + this.segmentType = config.segmentType ?? this.leafNode.upEdge!.type; + this.pipelineOpts = config.pipelineOpts ?? {}; + } + + public async provide( + matched: Schema.StrictMatch, + source: ChunkSource, + opts: { + freshnessMs?: number; + validityMs?: number; + signer?: Signer; + } = {}, + ) { + const results = []; + for await (const chunk of source.listChunks()) { + const leaf = Schema.apply(this.leafNode, { + ...matched.mapping, + [this.segmentPattern]: chunk.i, + }) as Schema.StrictMatch; + results.push( + await Schema.call(leaf, 'provide', chunk.payload, { + freshnessMs: opts.freshnessMs, + validityMs: opts.validityMs, + signer: opts.signer, + finalBlockId: chunk.final ? new Component(this.segmentType, Encoder.encode(NNI(chunk.final))) : undefined, + }), + ); + } + return results; + } + + public need( + matched: Schema.StrictMatch, + opts: { + abortSignal?: AbortSignal; + lifetimeAfterRto?: number; + verifier?: Verifier; + } = {}, + ): Result { + return new FetchResult(this.leafNode, matched.mapping, { + ...this.pipelineOpts, + ...opts, + segmentPattern: this.segmentPattern, + }); + } +} diff --git a/src/utils/mod.ts b/src/utils/mod.ts index 3ce9c07..cb3035e 100644 --- a/src/utils/mod.ts +++ b/src/utils/mod.ts @@ -9,3 +9,4 @@ export * from './signals.ts'; export * from './name-lit.ts'; export * from './disposable-stacks.ts'; export * from './responder.ts'; +export * from './event-chain.ts';