From 712fca23642a742c5982c7d87c5b0ec2bcfc3bfc Mon Sep 17 00:00:00 2001 From: Xinyu Ma Date: Mon, 6 May 2024 15:02:59 -0700 Subject: [PATCH] Update ndnTs segment fetcher --- deno.lock | 2 +- package.json | 2 +- pnpm-lock.yaml | 97 +++-- .../segmented-object/fetcher.test.ts | 130 +++--- src/namespace/segmented-object/fetcher.ts | 394 ++++++++--------- .../segmented-object/segmented-object.test.ts | 238 +++++----- .../segmented-object/segmented-object.ts | 412 +++++++++--------- 7 files changed, 638 insertions(+), 637 deletions(-) diff --git a/deno.lock b/deno.lock index 402b4b8..8806158 100644 --- a/deno.lock +++ b/deno.lock @@ -205,7 +205,7 @@ "npm:p-defer@^4.0.1", "npm:streaming-iterables@^8.0.1", "npm:tslib@^2.6.2", - "npm:type-fest@^4.18.0", + "npm:type-fest@^4.18.2", "npm:uuid@^9.0.1", "npm:wait-your-turn@^1.0.1", "npm:y-protocols@^1.0.6", diff --git a/package.json b/package.json index 786fe56..2ed2480 100644 --- a/package.json +++ b/package.json @@ -25,7 +25,7 @@ "eventemitter3": "^5.0.1", "jose": "^5.2.4", "tslib": "^2.6.2", - "type-fest": "^4.18.0", + "type-fest": "^4.18.2", "uuid": "^9.0.1", "wait-your-turn": "^1.0.1", "y-protocols": "^1.0.6", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 65791cd..071cfea 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -27,8 +27,8 @@ importers: specifier: ^2.6.2 version: 2.6.2 type-fest: - specifier: ^4.18.0 - version: 4.18.0 + specifier: ^4.18.2 + version: 4.18.2 uuid: specifier: ^9.0.1 version: 9.0.1 @@ -137,80 +137,80 @@ packages: '@ndn/endpoint@https://ndnts-nightly.ndn.today/endpoint.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/endpoint.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/fw@https://ndnts-nightly.ndn.today/fw.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/fw.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/keychain@https://ndnts-nightly.ndn.today/keychain.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/keychain.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/l3face@https://ndnts-nightly.ndn.today/l3face.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/l3face.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/lp@https://ndnts-nightly.ndn.today/lp.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/lp.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/naming-convention2@https://ndnts-nightly.ndn.today/naming-convention2.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/naming-convention2.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/ndncert@https://ndnts-nightly.ndn.today/ndncert.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/ndncert.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/ndnsec@https://ndnts-nightly.ndn.today/ndnsec.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/ndnsec.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/nfdmgmt@https://ndnts-nightly.ndn.today/nfdmgmt.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/nfdmgmt.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/node-transport@https://ndnts-nightly.ndn.today/node-transport.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/node-transport.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/packet@https://ndnts-nightly.ndn.today/packet.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/packet.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/rdr@https://ndnts-nightly.ndn.today/rdr.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/rdr.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/repo-api@https://ndnts-nightly.ndn.today/repo-api.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/repo-api.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/segmented-object@https://ndnts-nightly.ndn.today/segmented-object.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/segmented-object.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/svs@https://ndnts-nightly.ndn.today/svs.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/svs.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/sync-api@https://ndnts-nightly.ndn.today/sync-api.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/sync-api.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/tlv@https://ndnts-nightly.ndn.today/tlv.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/tlv.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@ndn/util@https://ndnts-nightly.ndn.today/util.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/util.tgz} - version: 0.0.20240421-nightly-ca71ef1 - engines: {node: ^18.18.0 || ^20.10.0 || ^21.0.0} + version: 0.0.20240505-nightly-86a5aaa + engines: {node: ^20.12.0 || ^22.0.0} '@ndn/ws-transport@https://ndnts-nightly.ndn.today/ws-transport.tgz': resolution: {tarball: https://ndnts-nightly.ndn.today/ws-transport.tgz} - version: 0.0.20240421-nightly-ca71ef1 + version: 0.0.20240505-nightly-86a5aaa '@noble/ed25519@2.1.0': resolution: {integrity: sha512-KM4qTyXPinyCgMzeYJH/UudpdL+paJXtY3CHtHYZQtBkS8MZoPr4rOikZllIutJe0d06QDQKisyn02gxZ8TcQA==} @@ -230,11 +230,11 @@ packages: '@types/minimalistic-assert@1.0.3': resolution: {integrity: sha512-Ku87cam4YxiXcEpeUemo+vO8QWGQ7U2CwEEcLcYFhxG8b4CK8gWxSX/oWjePWKwqPaWWxxVqXAdAjGdlJtVzDA==} - '@types/node@20.12.7': - resolution: {integrity: sha512-wq0cICSkRLVaf3UGLMGItu/PtdY7oaXaI/RVU+xliKVOtRna3PRY57ZDfztpDL0n11vfymMUnXv8QwYCO7L1wg==} + '@types/node@20.12.10': + resolution: {integrity: sha512-Eem5pH9pmWBHoGAT8Dr5fdc5rYA+4NAovdM4EktRPVAAiJhmWWfQrA0cFhAbOsQdSfIHjAud6YdkbL69+zSKjw==} - '@types/nodemailer@6.4.14': - resolution: {integrity: sha512-fUWthHO9k9DSdPCSPRqcu6TWhYyxTBg382vlNIttSe9M7XfsT06y0f24KHXtbnijPGGRIcVvdKHTNikOI6qiHA==} + '@types/nodemailer@6.4.15': + resolution: {integrity: sha512-0EBJxawVNjPkng1zm2vopRctuWVCxk34JcIlRuXSf54habUWdz1FB7wHDqOqvDa8Mtpt0Q3LTXQkAs2LNyK5jQ==} '@types/retry@0.12.5': resolution: {integrity: sha512-3xSjTp3v03X/lSQLkczaN9UIEwJMoMCA1+Nb5HfbJEQWogdeQIyVtTvxPXDQjZ5zws8rFQfVfRdz03ARihPJgw==} @@ -473,8 +473,8 @@ packages: resolution: {integrity: sha512-+P72GAjVAbTxjjwUmwjVrqrdZROD4nf8KgpBoDxqXXTiYZZt/ud60dE5yvCSr9lRO8e8yv6kgJIC0K0PfZFVQw==} hasBin: true - node-gyp-build@4.8.0: - resolution: {integrity: sha512-u6fs2AEUljNho3EYTJNBfImO5QTo/J/1Etd+NVdCj7qWKUSN/bSLkZwhDv7I+w/MSC6qJ4cknepkAYykDdK8og==} + node-gyp-build@4.8.1: + resolution: {integrity: sha512-OSs33Z9yWr148JZcbZd5WiAXhh/n9z8TxQcdMhIOlpN9AhWpLfvVFO73+m77bBABQMaY9XSvIa+qk0jlI7Gcaw==} hasBin: true nodemailer@6.9.13: @@ -585,8 +585,8 @@ packages: tslib@2.6.2: resolution: {integrity: sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==} - type-fest@4.18.0: - resolution: {integrity: sha512-+dbmiyliDY/2TTcjCS7NpI9yV2iEFlUDk5TKnsbkN7ZoRu5s7bT+zvYtNFhFXC2oLwURGT2frACAZvbbyNBI+w==} + type-fest@4.18.2: + resolution: {integrity: sha512-+suCYpfJLAe4OXS6+PPXjW3urOS4IoP9waSiLuXfLgqZODKw/aWwASvzqE886wA0kQgGy0mIWyhd87VpqIy6Xg==} engines: {node: '>=16'} typescript-event-target@1.1.1: @@ -697,7 +697,7 @@ snapshots: idb-keyval: 6.2.1 mnemonist: 0.39.8 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 wait-your-turn: 1.0.1 '@ndn/l3face@https://ndnts-nightly.ndn.today/l3face.tgz': @@ -712,7 +712,7 @@ snapshots: retry: 0.13.1 streaming-iterables: 8.0.1 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 typescript-event-target: 1.1.1 '@ndn/lp@https://ndnts-nightly.ndn.today/lp.tgz': @@ -740,14 +740,14 @@ snapshots: '@ndn/tlv': https://ndnts-nightly.ndn.today/tlv.tgz '@ndn/util': https://ndnts-nightly.ndn.today/util.tgz '@types/imap': 0.8.40 - '@types/nodemailer': 6.4.14 + '@types/nodemailer': 6.4.15 ajv: 8.13.0 b64-lite: 1.4.0 imap-emails: 1.0.4(tslib@2.6.2) nodemailer: 6.9.13 p-timeout: 6.1.2 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 typescript-event-target: 1.1.1 '@ndn/ndnsec@https://ndnts-nightly.ndn.today/ndnsec.tgz': @@ -778,7 +778,7 @@ snapshots: event-iterator: 2.0.0 p-event: 6.0.1 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 url-format-lax: 2.0.0 url-parse-lax: 5.0.0 @@ -789,7 +789,7 @@ snapshots: buffer-compare: 1.1.1 mnemonist: 0.39.8 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 '@ndn/rdr@https://ndnts-nightly.ndn.today/rdr.tgz': dependencies: @@ -824,9 +824,10 @@ snapshots: '@ndn/util': https://ndnts-nightly.ndn.today/util.tgz event-iterator: 2.0.0 hirestime: 7.0.4 + it-keepalive: 1.2.0 mnemonist: 0.39.8 + obliterator: 2.0.4 p-defer: 4.0.1 - p-event: 6.0.1 p-lazy: 4.0.0 streaming-iterables: 8.0.1 tslib: 2.6.2 @@ -845,7 +846,7 @@ snapshots: '@ndn/util': https://ndnts-nightly.ndn.today/util.tgz streaming-iterables: 8.0.1 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 typescript-event-target: 1.1.1 '@ndn/sync-api@https://ndnts-nightly.ndn.today/sync-api.tgz': @@ -860,7 +861,7 @@ snapshots: '@ndn/util': https://ndnts-nightly.ndn.today/util.tgz mnemonist: 0.39.8 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 '@ndn/util@https://ndnts-nightly.ndn.today/util.tgz': dependencies: @@ -870,7 +871,7 @@ snapshots: minimalistic-assert: 1.0.1 streaming-iterables: 8.0.1 tslib: 2.6.2 - type-fest: 4.18.0 + type-fest: 4.18.2 wait-your-turn: 1.0.1 '@ndn/ws-transport@https://ndnts-nightly.ndn.today/ws-transport.tgz': @@ -900,17 +901,17 @@ snapshots: '@types/imap@0.8.40': dependencies: - '@types/node': 20.12.7 + '@types/node': 20.12.10 '@types/minimalistic-assert@1.0.3': {} - '@types/node@20.12.7': + '@types/node@20.12.10': dependencies: undici-types: 5.26.5 - '@types/nodemailer@6.4.14': + '@types/nodemailer@6.4.15': dependencies: - '@types/node': 20.12.7 + '@types/node': 20.12.10 '@types/retry@0.12.5': {} @@ -920,7 +921,7 @@ snapshots: '@types/ws@8.5.10': dependencies: - '@types/node': 20.12.7 + '@types/node': 20.12.10 '@yoursunny/asn1@0.0.20200718': dependencies: @@ -960,7 +961,7 @@ snapshots: bufferutil@4.0.8: dependencies: - node-gyp-build: 4.8.0 + node-gyp-build: 4.8.1 optional: true cbor-extract@2.2.0: @@ -1172,7 +1173,7 @@ snapshots: detect-libc: 2.0.3 optional: true - node-gyp-build@4.8.0: + node-gyp-build@4.8.1: optional: true nodemailer@6.9.13: {} @@ -1251,7 +1252,7 @@ snapshots: tslib@2.6.2: {} - type-fest@4.18.0: {} + type-fest@4.18.2: {} typescript-event-target@1.1.1: {} diff --git a/src/namespace/segmented-object/fetcher.test.ts b/src/namespace/segmented-object/fetcher.test.ts index b4e1d8d..bf21c64 100644 --- a/src/namespace/segmented-object/fetcher.test.ts +++ b/src/namespace/segmented-object/fetcher.test.ts @@ -1,71 +1,71 @@ -import { assert } from '../../dep.ts'; -import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; -import { Data, digestSigning } from '@ndn/packet'; -import { Encoder } from '@ndn/tlv'; -import { Bridge } from '@ndn/l3face'; -import { NtSchema, VerifyResult } from '../nt-schema.ts'; -import { InMemoryStorage } from '../../storage/mod.ts'; -import { LeafNode } from '../leaf-node.ts'; -import { Fetcher } from './fetcher.ts'; +// import { assert } from '../../dep.ts'; +// import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; +// import { Data, digestSigning } from '@ndn/packet'; +// import { Encoder } from '@ndn/tlv'; +// import { Bridge } from '@ndn/l3face'; +// import { NtSchema, VerifyResult } from '../nt-schema.ts'; +// import { InMemoryStorage } from '../../storage/mod.ts'; +// import { LeafNode } from '../leaf-node.ts'; +// import { Fetcher } from './fetcher.ts'; -export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); +// export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); -Deno.test('Fetcher.1 Basic fetching', async () => { - using bridge = Bridge.create({}); - const { fwA, fwB } = bridge; - await using closers = new AsyncDisposableStack(); - const appPrefix = name`/prefix`; +// Deno.test('Fetcher.1 Basic fetching', async () => { +// using bridge = Bridge.create({}); +// const { fwA, fwB } = bridge; +// await using closers = new AsyncDisposableStack(); +// const appPrefix = name`/prefix`; - // NTSchema side - const schema = new NtSchema(); - const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, {}); - leafNode.resource!.onVerify.addListener(async ({ pkt }) => { - try { - await digestSigning.verify(pkt); - return VerifyResult.Pass; - } catch { - return VerifyResult.Fail; - } - }); - await schema.attach(appPrefix, fwA); - closers.defer(async () => await schema.detach()); +// // NTSchema side +// const schema = new NtSchema(); +// const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, {}); +// leafNode.resource!.onVerify.addListener(async ({ pkt }) => { +// try { +// await digestSigning.verify(pkt); +// return VerifyResult.Pass; +// } catch { +// return VerifyResult.Fail; +// } +// }); +// await schema.attach(appPrefix, fwA); +// closers.defer(async () => await schema.detach()); - // Responder side - const storage = new InMemoryStorage(); - closers.use(storage); - const responder = new Responder(appPrefix, fwB, storage); - closers.use(responder); - const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; - for (const [i, p] of payloads.entries()) { - const data = new Data( - name`${appPrefix}/object/50=${i}`, - Data.FreshnessPeriod(1000), - p, - ); - data.finalBlockId = name`50=${payloads.length - 1}`.get(0); - await digestSigning.sign(data); - await storage.set(data.name.toString(), Encoder.encode(data)); - } +// // Responder side +// const storage = new InMemoryStorage(); +// closers.use(storage); +// const responder = new Responder(appPrefix, fwB, storage); +// closers.use(responder); +// const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; +// for (const [i, p] of payloads.entries()) { +// const data = new Data( +// name`${appPrefix}/object/50=${i}`, +// Data.FreshnessPeriod(1000), +// p, +// ); +// data.finalBlockId = name`50=${payloads.length - 1}`.get(0); +// await digestSigning.sign(data); +// await storage.set(data.name.toString(), Encoder.encode(data)); +// } - // Fetching Object - const results = payloads.map(() => 'MISSING'); - const fetcher = new Fetcher(leafNode, {}, {}); - fetcher.onSegment.addListener((segNum, data) => { - // results[segNum] = data.content === payloads[segNum] ? 'CORRECT' : 'WRONG'; - assert.assertEquals(data.content, payloads[segNum]); - results[segNum] = 'CORRECT'; - return Promise.resolve(); - }); - const { promise: finishPromise, resolve: resolveFinish } = Promise.withResolvers(); - fetcher.onEnd.addListener(() => { - resolveFinish(); - return Promise.resolve(); - }); - fetcher.onError.addListener((err) => { - throw new Error(`Fetching failed: ${err}`); - }); - fetcher.run(); // Concurrently without await - await finishPromise; +// // Fetching Object +// const results = payloads.map(() => 'MISSING'); +// const fetcher = new Fetcher(leafNode, {}, {}); +// fetcher.onSegment.addListener((segNum, data) => { +// // results[segNum] = data.content === payloads[segNum] ? 'CORRECT' : 'WRONG'; +// assert.assertEquals(data.content, payloads[segNum]); +// results[segNum] = 'CORRECT'; +// return Promise.resolve(); +// }); +// const { promise: finishPromise, resolve: resolveFinish } = Promise.withResolvers(); +// fetcher.onEnd.addListener(() => { +// resolveFinish(); +// return Promise.resolve(); +// }); +// fetcher.onError.addListener((err) => { +// throw new Error(`Fetching failed: ${err}`); +// }); +// fetcher.run(); // Concurrently without await +// await finishPromise; - assert.assertEquals(results, ['CORRECT', 'CORRECT', 'CORRECT']); -}); +// assert.assertEquals(results, ['CORRECT', 'CORRECT', 'CORRECT']); +// }); diff --git a/src/namespace/segmented-object/fetcher.ts b/src/namespace/segmented-object/fetcher.ts index 42f37a1..ef1f5eb 100644 --- a/src/namespace/segmented-object/fetcher.ts +++ b/src/namespace/segmented-object/fetcher.ts @@ -1,197 +1,197 @@ -// Modified from @ndn/segmented-object/src/fetch/fetcher.ts -// ISC License -// Copyright (c) 2019-2024, Junxiao Shi -import { Data, NamingConvention, Verifier } from '@ndn/packet'; -import * as Pattern from '../name-pattern.ts'; -import * as Schema from '../schema-tree.ts'; -// @deno-types="@ndn/segmented-object/lib/fetch/logic.d.ts" -import { FetchLogic } from '@ndn/segmented-object/lib/fetch/logic_browser.js'; -import { LeafNode } from '../leaf-node.ts'; -import { NNI } from '@ndn/tlv'; -import { EventChain } from '../../utils/mod.ts'; - -export type SegmentConvention = NamingConvention; -export type PipelineOptions = FetchLogic.Options; - -export interface EventMap { - /** Emitted when a Data segment arrives. Note: it is blocking */ - segment(segNum: number, data: Data): Promise; - /** Emitted after all data chunks arrive. */ - end(): Promise; - /** Emitted upon error. */ - error(err: Error): Promise; -} - -type PipelinePacket = { - segNum: number; - matched: Schema.MatchedObject; - lifetimeMs: number; - toCancel: boolean; - internalId: number; -}; - -/** Fetch Data packets as guided by FetchLogic. */ -export class Fetcher { - #count = 0; - #internalId = 0; - #hasFailed = false; - readonly #intSignals: Map = new Map(); - readonly #mapping: Pattern.Mapping; - readonly #handleAbort; - readonly segmentPattern: string; - readonly verifier: Verifier | undefined; - - public readonly onSegment = new EventChain(); - public readonly onEnd = new EventChain(); - public readonly onError = new EventChain(); - - /** Number of segments retrieved so far. */ - public get count() { - return this.#count; - } - - private readonly logic: FetchLogic; - private readonly signal?: AbortSignal; - private readonly lifetimeAfterRto!: number; - - constructor( - /** The leaf node representing the segments */ - readonly node: Schema.Node, - prefixMapping: Pattern.Mapping, - opts: FetcherOptions, - ) { - this.signal = opts.signal; - this.lifetimeAfterRto = opts.lifetimeAfterRto ?? 1000; - this.segmentPattern = opts.segmentPattern ?? (node.upEdge as Pattern.PatternComponent).tag; - this.verifier = opts.verifier; - this.#mapping = { ...prefixMapping }; - - this.logic = new FetchLogic(opts); - this.logic.addEventListener('end', async () => { - await this.onEnd.emit(); - this.close(); - }); - this.logic.addEventListener('exceedRetxLimit', ({ detail: segNum }) => { - this.fail(new Error(`cannot retrieve segment ${segNum}`)); - }); - - this.#handleAbort = () => { - this.fail(new Error('fetch aborted')); - }; - this.signal?.addEventListener('abort', this.#handleAbort); - } - - public close(): void { - this.signal?.removeEventListener('abort', this.#handleAbort); - this.logic.close(); - } - - /** - * Pause outgoing Interests, for backpressure from Data consumer. - * @returns Function for resuming. - */ - public pause() { - return this.logic.pause(); - } - - public async run() { - const sender = this.logic.outgoing( - ({ segNum, rto }) => - ({ - segNum, - matched: Schema.apply(this.node, { ...this.#mapping, [this.segmentPattern]: segNum }), - lifetimeMs: rto + this.lifetimeAfterRto, - toCancel: false, - internalId: this.#internalId++, - }) satisfies PipelinePacket, - ({ interest }) => ({ ...interest, toCancel: true }) satisfies PipelinePacket, - ) as AsyncGenerator; - - for await (const action of sender) { - if (action.toCancel) { - // TODO: Should use TX at NTSchema to avoid overhead - this.#intSignals.get(action.internalId)?.abort(); - this.#intSignals.delete(action.internalId); - } else { - // Create Interest without waiting - this.shoot(action); - } - } - } - - private async shoot({ segNum, matched, lifetimeMs, internalId }: PipelinePacket) { - const abortCtl = new AbortController(); - const abortSignal = abortCtl.signal; - this.#intSignals.set(internalId, abortCtl); - - try { - const verifier = this.verifier; - const data = await Schema.call(matched, 'need', { abortSignal, lifetimeMs, verifier }); - - const now = this.logic.now(); - this.logic.satisfy(segNum, now, false); // TODO: congestionMark - if (data.isFinalBlock) { - this.logic.setFinalSegNum(segNum); - } else if (data.finalBlockId && data.finalBlockId.type === this.node.upEdge?.type) { - this.logic.setFinalSegNum(NNI.decode(data.finalBlockId.value), true); - } - ++this.#count; - await this.onSegment.emit(segNum, data); - } catch (err) { - if (err instanceof Error && err.message.startsWith('Interest rejected')) { - // Silently ignore timeouts - } else { - // Pass verification error - this.fail(new Error(`failed to fetch segment ${segNum}: ${err}`)); - } - } - } - - private fail(err: Error): void { - if (this.#hasFailed) { - return; - } - this.#hasFailed = true; - setTimeout(async () => { - await this.onError.emit(err); - this.close(); - }, 0); - } -} - -export interface FetcherOptions extends FetchLogic.Options { - /** AbortSignal that allows canceling the Interest via AbortController. */ - signal?: AbortSignal; - - /** - * InterestLifetime added to RTO. - * @defaultValue 1000ms - * - * @remarks - * Ignored if `lifetime` is set. - */ - lifetimeAfterRto?: number; - - /** - * The name of the pattern variable for segment - * @defaultValue parent edge of node - */ - segmentPattern?: string; - - /** - * The verifier to verify received segments. - * By default, the one configured at the LeafNode will be used. - */ - verifier?: Verifier; -} - -export interface SegmentData { - segNum: number; - data: Data; -} - -export class SegmentDataEvent extends Event implements SegmentData { - constructor(type: string, public readonly segNum: number, public readonly data: Data) { - super(type); - } -} +// // Modified from @ndn/segmented-object/src/fetch/fetcher.ts +// // ISC License +// // Copyright (c) 2019-2024, Junxiao Shi +// import { Data, NamingConvention, Verifier } from '@ndn/packet'; +// import * as Pattern from '../name-pattern.ts'; +// import * as Schema from '../schema-tree.ts'; +// // @deno-types="@ndn/segmented-object/lib/fetch/logic.d.ts" +// import { FetchLogic } from '@ndn/segmented-object/lib/fetch/logic_browser.js'; +// import { LeafNode } from '../leaf-node.ts'; +// import { NNI } from '@ndn/tlv'; +// import { EventChain } from '../../utils/mod.ts'; + +// export type SegmentConvention = NamingConvention; +// export type PipelineOptions = FetchLogic.Options; + +// export interface EventMap { +// /** Emitted when a Data segment arrives. Note: it is blocking */ +// segment(segNum: number, data: Data): Promise; +// /** Emitted after all data chunks arrive. */ +// end(): Promise; +// /** Emitted upon error. */ +// error(err: Error): Promise; +// } + +// type PipelinePacket = { +// segNum: number; +// matched: Schema.MatchedObject; +// lifetimeMs: number; +// toCancel: boolean; +// internalId: number; +// }; + +// /** Fetch Data packets as guided by FetchLogic. */ +// export class Fetcher { +// #count = 0; +// #internalId = 0; +// #hasFailed = false; +// readonly #intSignals: Map = new Map(); +// readonly #mapping: Pattern.Mapping; +// readonly #handleAbort; +// readonly segmentPattern: string; +// readonly verifier: Verifier | undefined; + +// public readonly onSegment = new EventChain(); +// public readonly onEnd = new EventChain(); +// public readonly onError = new EventChain(); + +// /** Number of segments retrieved so far. */ +// public get count() { +// return this.#count; +// } + +// private readonly logic: FetchLogic; +// private readonly signal?: AbortSignal; +// private readonly lifetimeAfterRto!: number; + +// constructor( +// /** The leaf node representing the segments */ +// readonly node: Schema.Node, +// prefixMapping: Pattern.Mapping, +// opts: FetcherOptions, +// ) { +// this.signal = opts.signal; +// this.lifetimeAfterRto = opts.lifetimeAfterRto ?? 1000; +// this.segmentPattern = opts.segmentPattern ?? (node.upEdge as Pattern.PatternComponent).tag; +// this.verifier = opts.verifier; +// this.#mapping = { ...prefixMapping }; + +// this.logic = new FetchLogic(opts); +// this.logic.addEventListener('end', async () => { +// await this.onEnd.emit(); +// this.close(); +// }); +// this.logic.addEventListener('exceedRetxLimit', ({ detail: segNum }) => { +// this.fail(new Error(`cannot retrieve segment ${segNum}`)); +// }); + +// this.#handleAbort = () => { +// this.fail(new Error('fetch aborted')); +// }; +// this.signal?.addEventListener('abort', this.#handleAbort); +// } + +// public close(): void { +// this.signal?.removeEventListener('abort', this.#handleAbort); +// this.logic.close(); +// } + +// /** +// * Pause outgoing Interests, for backpressure from Data consumer. +// * @returns Function for resuming. +// */ +// public pause() { +// return this.logic.pause(); +// } + +// public async run() { +// const sender = this.logic.outgoing( +// ({ segNum, rto }) => +// ({ +// segNum, +// matched: Schema.apply(this.node, { ...this.#mapping, [this.segmentPattern]: segNum }), +// lifetimeMs: rto + this.lifetimeAfterRto, +// toCancel: false, +// internalId: this.#internalId++, +// }) satisfies PipelinePacket, +// ({ interest }) => ({ ...interest, toCancel: true }) satisfies PipelinePacket, +// ) as AsyncGenerator; + +// for await (const action of sender) { +// if (action.toCancel) { +// // TODO: Should use TX at NTSchema to avoid overhead +// this.#intSignals.get(action.internalId)?.abort(); +// this.#intSignals.delete(action.internalId); +// } else { +// // Create Interest without waiting +// this.shoot(action); +// } +// } +// } + +// private async shoot({ segNum, matched, lifetimeMs, internalId }: PipelinePacket) { +// const abortCtl = new AbortController(); +// const abortSignal = abortCtl.signal; +// this.#intSignals.set(internalId, abortCtl); + +// try { +// const verifier = this.verifier; +// const data = await Schema.call(matched, 'need', { abortSignal, lifetimeMs, verifier }); + +// const now = this.logic.now(); +// this.logic.satisfy(segNum, now, false); // TODO: congestionMark +// if (data.isFinalBlock) { +// this.logic.setFinalSegNum(segNum); +// } else if (data.finalBlockId && data.finalBlockId.type === this.node.upEdge?.type) { +// this.logic.setFinalSegNum(NNI.decode(data.finalBlockId.value), true); +// } +// ++this.#count; +// await this.onSegment.emit(segNum, data); +// } catch (err) { +// if (err instanceof Error && err.message.startsWith('Interest rejected')) { +// // Silently ignore timeouts +// } else { +// // Pass verification error +// this.fail(new Error(`failed to fetch segment ${segNum}: ${err}`)); +// } +// } +// } + +// private fail(err: Error): void { +// if (this.#hasFailed) { +// return; +// } +// this.#hasFailed = true; +// setTimeout(async () => { +// await this.onError.emit(err); +// this.close(); +// }, 0); +// } +// } + +// export interface FetcherOptions extends FetchLogic.Options { +// /** AbortSignal that allows canceling the Interest via AbortController. */ +// signal?: AbortSignal; + +// /** +// * InterestLifetime added to RTO. +// * @defaultValue 1000ms +// * +// * @remarks +// * Ignored if `lifetime` is set. +// */ +// lifetimeAfterRto?: number; + +// /** +// * The name of the pattern variable for segment +// * @defaultValue parent edge of node +// */ +// segmentPattern?: string; + +// /** +// * The verifier to verify received segments. +// * By default, the one configured at the LeafNode will be used. +// */ +// verifier?: Verifier; +// } + +// export interface SegmentData { +// segNum: number; +// data: Data; +// } + +// export class SegmentDataEvent extends Event implements SegmentData { +// constructor(type: string, public readonly segNum: number, public readonly data: Data) { +// super(type); +// } +// } diff --git a/src/namespace/segmented-object/segmented-object.test.ts b/src/namespace/segmented-object/segmented-object.test.ts index e7ff87a..f4bb0b1 100644 --- a/src/namespace/segmented-object/segmented-object.test.ts +++ b/src/namespace/segmented-object/segmented-object.test.ts @@ -1,130 +1,130 @@ -import { assert } from '../../dep.ts'; -import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; -import { Data, digestSigning } from '@ndn/packet'; -import { Decoder, Encoder } from '@ndn/tlv'; -import { Bridge } from '@ndn/l3face'; -import { BufferChunkSource, fetch } from '@ndn/segmented-object'; -import { NtSchema, VerifyResult } from '../nt-schema.ts'; -import { InMemoryStorage } from '../../storage/mod.ts'; -import { LeafNode } from '../leaf-node.ts'; -import * as Tree from '../schema-tree.ts'; -import { SegmentedObjectNode } from './segmented-object.ts'; +// import { assert } from '../../dep.ts'; +// import { AsyncDisposableStack, name, Responder } from '../../utils/mod.ts'; +// import { Data, digestSigning } from '@ndn/packet'; +// import { Decoder, Encoder } from '@ndn/tlv'; +// import { Bridge } from '@ndn/l3face'; +// import { BufferChunkSource, fetch } from '@ndn/segmented-object'; +// import { NtSchema, VerifyResult } from '../nt-schema.ts'; +// import { InMemoryStorage } from '../../storage/mod.ts'; +// import { LeafNode } from '../leaf-node.ts'; +// import * as Tree from '../schema-tree.ts'; +// import { SegmentedObjectNode } from './segmented-object.ts'; -export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); +// export const b = ([value]: TemplateStringsArray) => new TextEncoder().encode(value); -Deno.test('SegmentedObject.1 Basic fetching', async () => { - using bridge = Bridge.create({}); - const { fwA, fwB } = bridge; - await using closers = new AsyncDisposableStack(); - const appPrefix = name`/prefix`; +// Deno.test('SegmentedObject.1 Basic fetching', async () => { +// using bridge = Bridge.create({}); +// const { fwA, fwB } = bridge; +// await using closers = new AsyncDisposableStack(); +// const appPrefix = name`/prefix`; - // NTSchema side - const schema = new NtSchema(); - const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, {}); - leafNode.resource!.onVerify.addListener(async ({ pkt }) => { - try { - await digestSigning.verify(pkt); - return VerifyResult.Pass; - } catch { - return VerifyResult.Fail; - } - }); - const segObjNode = schema.set('/object', SegmentedObjectNode, { - leafNode, - lifetimeAfterRto: 100, - }); - await schema.attach(appPrefix, fwA); - closers.defer(async () => await schema.detach()); +// // NTSchema side +// const schema = new NtSchema(); +// const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, {}); +// leafNode.resource!.onVerify.addListener(async ({ pkt }) => { +// try { +// await digestSigning.verify(pkt); +// return VerifyResult.Pass; +// } catch { +// return VerifyResult.Fail; +// } +// }); +// const segObjNode = schema.set('/object', SegmentedObjectNode, { +// leafNode, +// lifetimeAfterRto: 100, +// }); +// await schema.attach(appPrefix, fwA); +// closers.defer(async () => await schema.detach()); - // Responder side - const storage = new InMemoryStorage(); - closers.use(storage); - const responder = new Responder(appPrefix, fwB, storage); - closers.use(responder); - const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; - for (const [i, p] of payloads.entries()) { - const data = new Data( - name`${appPrefix}/object/50=${i}`, - Data.FreshnessPeriod(1000), - p, - ); - data.finalBlockId = name`50=${payloads.length - 1}`.get(0); - await digestSigning.sign(data); - await storage.set(data.name.toString(), Encoder.encode(data)); - } +// // Responder side +// const storage = new InMemoryStorage(); +// closers.use(storage); +// const responder = new Responder(appPrefix, fwB, storage); +// closers.use(responder); +// const payloads = [b`SEG1 `, b`SEG2 `, b`SEG3;`]; +// for (const [i, p] of payloads.entries()) { +// const data = new Data( +// name`${appPrefix}/object/50=${i}`, +// Data.FreshnessPeriod(1000), +// p, +// ); +// data.finalBlockId = name`50=${payloads.length - 1}`.get(0); +// await digestSigning.sign(data); +// await storage.set(data.name.toString(), Encoder.encode(data)); +// } - // Fetching Object - const results = []; - const matched = Tree.apply(segObjNode, {}); - const needed = Tree.call(matched, 'need'); - for await (const r of needed) { - results.push(r.content); - } +// // Fetching Object +// const results = []; +// const matched = Tree.apply(segObjNode, {}); +// const needed = Tree.call(matched, 'need'); +// for await (const r of needed) { +// results.push(r.content); +// } - assert.assertEquals(results, payloads); -}); +// assert.assertEquals(results, payloads); +// }); -Deno.test('SegmentedObject.2 Basic provide', async () => { - using bridge = Bridge.create({}); - const { fwA, fwB } = bridge; - await using closers = new AsyncDisposableStack(); - const appPrefix = name`/prefix`; +// Deno.test('SegmentedObject.2 Basic provide', async () => { +// using bridge = Bridge.create({}); +// const { fwA, fwB } = bridge; +// await using closers = new AsyncDisposableStack(); +// const appPrefix = name`/prefix`; - // NTSchema side - const schema = new NtSchema(); - const storageA = new InMemoryStorage(); - const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, { - freshnessMs: 60000, - signer: digestSigning, - }); - leafNode.resource!.onVerify.addListener(async ({ pkt }) => { - try { - await digestSigning.verify(pkt); - return VerifyResult.Pass; - } catch { - return VerifyResult.Fail; - } - }); - leafNode.resource!.onSaveStorage.addListener(({ data, wire }) => storageA.set(data.name.toString(), wire)); - leafNode.resource!.onSearchStorage.addListener(async ({ interest }) => { - const wire = await storageA.get(interest.name.toString()); - if (wire) { - return Decoder.decode(wire, Data); - } else { - return undefined; - } - }); - const segObjNode = schema.set('/object', SegmentedObjectNode, { - leafNode, - lifetimeAfterRto: 100, - }); - await schema.attach(appPrefix, fwA); - closers.defer(async () => await schema.detach()); +// // NTSchema side +// const schema = new NtSchema(); +// const storageA = new InMemoryStorage(); +// const leafNode = schema.set('/object/<50=segNum:number>', LeafNode, { +// freshnessMs: 60000, +// signer: digestSigning, +// }); +// leafNode.resource!.onVerify.addListener(async ({ pkt }) => { +// try { +// await digestSigning.verify(pkt); +// return VerifyResult.Pass; +// } catch { +// return VerifyResult.Fail; +// } +// }); +// leafNode.resource!.onSaveStorage.addListener(({ data, wire }) => storageA.set(data.name.toString(), wire)); +// leafNode.resource!.onSearchStorage.addListener(async ({ interest }) => { +// const wire = await storageA.get(interest.name.toString()); +// if (wire) { +// return Decoder.decode(wire, Data); +// } else { +// return undefined; +// } +// }); +// const segObjNode = schema.set('/object', SegmentedObjectNode, { +// leafNode, +// lifetimeAfterRto: 100, +// }); +// await schema.attach(appPrefix, fwA); +// closers.defer(async () => await schema.detach()); - // Provide object - const matched = Tree.apply(segObjNode, {}); - await Tree.call( - matched, - 'provide', - new BufferChunkSource( - b`SEG1 SEG2 SEG3;`, - { chunkSize: 5 }, - ), - ); +// // Provide object +// const matched = Tree.apply(segObjNode, {}); +// await Tree.call( +// matched, +// 'provide', +// new BufferChunkSource( +// b`SEG1 SEG2 SEG3;`, +// { chunkSize: 5 }, +// ), +// ); - // Fetching Object - const results = []; - const result = fetch(name`${appPrefix}/object/`, { - fw: fwB, - verifier: digestSigning, - modifyInterest: { mustBeFresh: true }, - lifetimeAfterRto: 2000, - // default naming convention is 50= - }); - for await (const segment of result) { - // Reassemble - results.push(segment.content); - } +// // Fetching Object +// const results = []; +// const result = fetch(name`${appPrefix}/object/`, { +// fw: fwB, +// verifier: digestSigning, +// modifyInterest: { mustBeFresh: true }, +// lifetimeAfterRto: 2000, +// // default naming convention is 50= +// }); +// for await (const segment of result) { +// // Reassemble +// results.push(segment.content); +// } - assert.assertEquals(results, [b`SEG1 `, b`SEG2 `, b`SEG3;`]); -}); +// assert.assertEquals(results, [b`SEG1 `, b`SEG2 `, b`SEG3;`]); +// }); diff --git a/src/namespace/segmented-object/segmented-object.ts b/src/namespace/segmented-object/segmented-object.ts index 1e51cbd..9f4d614 100644 --- a/src/namespace/segmented-object/segmented-object.ts +++ b/src/namespace/segmented-object/segmented-object.ts @@ -1,206 +1,206 @@ -import { Component, Data, Signer, Verifier } from '@ndn/packet'; -import { assert, concatBuffers, Reorder } from '@ndn/util'; -import type { ChunkSource } from '@ndn/segmented-object'; -import { Encoder, NNI } from '@ndn/tlv'; -import { collect, map, type WritableStreamish, writeToStream } from 'streaming-iterables'; -import type { Promisable } from 'type-fest'; -import { EventIterator } from 'event-iterator'; -import { BaseNode } from '../base-node.ts'; -import * as Pattern from '../name-pattern.ts'; -import * as Schema from '../schema-tree.ts'; -import { LeafNode } from '../leaf-node.ts'; -import { Fetcher, FetcherOptions, PipelineOptions } from './fetcher.ts'; - -export interface Result extends PromiseLike, AsyncIterable { - /** Iterate over Data packets as they arrive, not sorted in segment number order. */ - unordered: () => AsyncIterable; - - /** Iterate over payload chunks in segment number order. */ - chunks: () => AsyncIterable; - - /** Write all chunks to the destination stream. */ - pipe: (dest: WritableStreamish) => Promise; - - /** Number of segments retrieved so far. */ - readonly count: number; -} - -type SegmentData = { - segNum: number; - data: Data; -}; - -class FetchResult implements Result { - constructor( - private readonly node: Schema.Node, - private readonly prefixMapping: Pattern.Mapping, - private readonly opts: FetcherOptions, - ) {} - - public get count(): number { - return this.ctx?.count ?? 0; - } - private ctx?: Fetcher; - private promise?: Promise; - - private startFetcher() { - assert(!this.ctx, 'FetchResult is already used'); - const ctx = new Fetcher(this.node, this.prefixMapping, this.opts); - this.ctx = ctx; - ctx.run(); - return new EventIterator(({ push, stop, fail, on }) => { - let resume: (() => void) | undefined; - on('highWater', () => { - resume = ctx.pause(); - }); - on('lowWater', () => { - resume?.(); - }); - - const abort = new AbortController(); - ctx.onSegment.addListener((segNum, data) => Promise.resolve(push({ segNum, data }))); - ctx.onEnd.addListener(() => Promise.resolve(stop())); - // ctx.addEventListener('error', ({ detail }) => fail(detail), { signal: abort.signal }); - const errorListener = (err: Error) => Promise.resolve(fail(err)); - ctx.onError.addListener(errorListener); - abort.signal.addEventListener('abort', () => { - ctx.onError.removeListener(errorListener); - }); - return () => { - resume?.(); - abort.abort(); - }; - }); - } - - public unordered() { - return map( - ({ data, segNum }) => Object.assign(data, { segNum }), - this.startFetcher(), - ); - } - - private async *ordered() { - const reorder = new Reorder(this.opts.segmentRange?.[0]); - for await (const { segNum, data } of this.startFetcher()) { - reorder.push(segNum, data); - yield* reorder.shift(); - } - assert(reorder.empty, `${reorder.size} leftover segments`); - } - - public chunks() { - return map((data) => data.content, this.ordered()); - } - - public pipe(dest: WritableStreamish) { - return writeToStream(dest, this.chunks()); - } - - private async startPromise() { - const chunks = await collect(this.chunks()); - return concatBuffers(chunks); - } - - public then( - onfulfilled?: ((value: Uint8Array) => Promisable) | null, - onrejected?: ((reason: unknown) => Promisable) | null, - ) { - this.promise ??= this.startPromise(); - return this.promise.then(onfulfilled, onrejected); - } - - public [Symbol.asyncIterator]() { - return this.ordered()[Symbol.asyncIterator](); - } -} - -export type SegmentedObjectOpts = { - /** The leaf node representing the segments */ - leafNode: Schema.Node; - - /** - * InterestLifetime added to RTO. - * @defaultValue 1000ms - * - * @remarks - * Ignored if `lifetime` is set. - */ - lifetimeAfterRto?: number; - - /** - * The name of the pattern variable for segment - * @defaultValue parent edge of node - */ - segmentPattern?: string; - - /** - * The Component type of the segment - * @defaultValue parent edge of node - */ - segmentType?: number; - - pipelineOpts?: PipelineOptions; -}; - -export class SegmentedObjectNode extends BaseNode { - lifetimeAfterRto: number; - segmentPattern: string; - segmentType: number; - leafNode: Schema.Node; - pipelineOpts: PipelineOptions; - - constructor( - config: SegmentedObjectOpts, - describe?: string, - ) { - super(describe); - this.leafNode = config.leafNode; - this.lifetimeAfterRto = config.lifetimeAfterRto ?? 1000; - this.segmentPattern = config.segmentPattern ?? (this.leafNode.upEdge as Pattern.PatternComponent).tag; - this.segmentType = config.segmentType ?? this.leafNode.upEdge!.type; - this.pipelineOpts = config.pipelineOpts ?? {}; - } - - public async provide( - matched: Schema.StrictMatch, - source: ChunkSource, - opts: { - freshnessMs?: number; - validityMs?: number; - signer?: Signer; - } = {}, - ) { - const results = []; - for await (const chunk of source.listChunks()) { - const leaf = Schema.apply(this.leafNode, { - ...matched.mapping, - [this.segmentPattern]: chunk.i, - }) as Schema.StrictMatch; - results.push( - await Schema.call(leaf, 'provide', chunk.payload, { - freshnessMs: opts.freshnessMs, - validityMs: opts.validityMs, - signer: opts.signer, - finalBlockId: chunk.final ? new Component(this.segmentType, Encoder.encode(NNI(chunk.final))) : undefined, - }), - ); - } - return results; - } - - public need( - matched: Schema.StrictMatch, - opts: { - abortSignal?: AbortSignal; - lifetimeAfterRto?: number; - verifier?: Verifier; - } = {}, - ): Result { - return new FetchResult(this.leafNode, matched.mapping, { - ...this.pipelineOpts, - ...opts, - segmentPattern: this.segmentPattern, - }); - } -} +// import { Component, Data, Signer, Verifier } from '@ndn/packet'; +// import { assert, concatBuffers, Reorder } from '@ndn/util'; +// import type { ChunkSource } from '@ndn/segmented-object'; +// import { Encoder, NNI } from '@ndn/tlv'; +// import { collect, map, type WritableStreamish, writeToStream } from 'streaming-iterables'; +// import type { Promisable } from 'type-fest'; +// import { EventIterator } from 'event-iterator'; +// import { BaseNode } from '../base-node.ts'; +// import * as Pattern from '../name-pattern.ts'; +// import * as Schema from '../schema-tree.ts'; +// import { LeafNode } from '../leaf-node.ts'; +// import { Fetcher, FetcherOptions, PipelineOptions } from './fetcher.ts'; + +// export interface Result extends PromiseLike, AsyncIterable { +// /** Iterate over Data packets as they arrive, not sorted in segment number order. */ +// unordered: () => AsyncIterable; + +// /** Iterate over payload chunks in segment number order. */ +// chunks: () => AsyncIterable; + +// /** Write all chunks to the destination stream. */ +// pipe: (dest: WritableStreamish) => Promise; + +// /** Number of segments retrieved so far. */ +// readonly count: number; +// } + +// type SegmentData = { +// segNum: number; +// data: Data; +// }; + +// class FetchResult implements Result { +// constructor( +// private readonly node: Schema.Node, +// private readonly prefixMapping: Pattern.Mapping, +// private readonly opts: FetcherOptions, +// ) {} + +// public get count(): number { +// return this.ctx?.count ?? 0; +// } +// private ctx?: Fetcher; +// private promise?: Promise; + +// private startFetcher() { +// assert(!this.ctx, 'FetchResult is already used'); +// const ctx = new Fetcher(this.node, this.prefixMapping, this.opts); +// this.ctx = ctx; +// ctx.run(); +// return new EventIterator(({ push, stop, fail, on }) => { +// let resume: (() => void) | undefined; +// on('highWater', () => { +// resume = ctx.pause(); +// }); +// on('lowWater', () => { +// resume?.(); +// }); + +// const abort = new AbortController(); +// ctx.onSegment.addListener((segNum, data) => Promise.resolve(push({ segNum, data }))); +// ctx.onEnd.addListener(() => Promise.resolve(stop())); +// // ctx.addEventListener('error', ({ detail }) => fail(detail), { signal: abort.signal }); +// const errorListener = (err: Error) => Promise.resolve(fail(err)); +// ctx.onError.addListener(errorListener); +// abort.signal.addEventListener('abort', () => { +// ctx.onError.removeListener(errorListener); +// }); +// return () => { +// resume?.(); +// abort.abort(); +// }; +// }); +// } + +// public unordered() { +// return map( +// ({ data, segNum }) => Object.assign(data, { segNum }), +// this.startFetcher(), +// ); +// } + +// private async *ordered() { +// const reorder = new Reorder(this.opts.segmentRange?.[0]); +// for await (const { segNum, data } of this.startFetcher()) { +// reorder.push(segNum, data); +// yield* reorder.shift(); +// } +// assert(reorder.empty, `${reorder.size} leftover segments`); +// } + +// public chunks() { +// return map((data) => data.content, this.ordered()); +// } + +// public pipe(dest: WritableStreamish) { +// return writeToStream(dest, this.chunks()); +// } + +// private async startPromise() { +// const chunks = await collect(this.chunks()); +// return concatBuffers(chunks); +// } + +// public then( +// onfulfilled?: ((value: Uint8Array) => Promisable) | null, +// onrejected?: ((reason: unknown) => Promisable) | null, +// ) { +// this.promise ??= this.startPromise(); +// return this.promise.then(onfulfilled, onrejected); +// } + +// public [Symbol.asyncIterator]() { +// return this.ordered()[Symbol.asyncIterator](); +// } +// } + +// export type SegmentedObjectOpts = { +// /** The leaf node representing the segments */ +// leafNode: Schema.Node; + +// /** +// * InterestLifetime added to RTO. +// * @defaultValue 1000ms +// * +// * @remarks +// * Ignored if `lifetime` is set. +// */ +// lifetimeAfterRto?: number; + +// /** +// * The name of the pattern variable for segment +// * @defaultValue parent edge of node +// */ +// segmentPattern?: string; + +// /** +// * The Component type of the segment +// * @defaultValue parent edge of node +// */ +// segmentType?: number; + +// pipelineOpts?: PipelineOptions; +// }; + +// export class SegmentedObjectNode extends BaseNode { +// lifetimeAfterRto: number; +// segmentPattern: string; +// segmentType: number; +// leafNode: Schema.Node; +// pipelineOpts: PipelineOptions; + +// constructor( +// config: SegmentedObjectOpts, +// describe?: string, +// ) { +// super(describe); +// this.leafNode = config.leafNode; +// this.lifetimeAfterRto = config.lifetimeAfterRto ?? 1000; +// this.segmentPattern = config.segmentPattern ?? (this.leafNode.upEdge as Pattern.PatternComponent).tag; +// this.segmentType = config.segmentType ?? this.leafNode.upEdge!.type; +// this.pipelineOpts = config.pipelineOpts ?? {}; +// } + +// public async provide( +// matched: Schema.StrictMatch, +// source: ChunkSource, +// opts: { +// freshnessMs?: number; +// validityMs?: number; +// signer?: Signer; +// } = {}, +// ) { +// const results = []; +// for await (const chunk of source.listChunks()) { +// const leaf = Schema.apply(this.leafNode, { +// ...matched.mapping, +// [this.segmentPattern]: chunk.i, +// }) as Schema.StrictMatch; +// results.push( +// await Schema.call(leaf, 'provide', chunk.payload, { +// freshnessMs: opts.freshnessMs, +// validityMs: opts.validityMs, +// signer: opts.signer, +// finalBlockId: chunk.final ? new Component(this.segmentType, Encoder.encode(NNI(chunk.final))) : undefined, +// }), +// ); +// } +// return results; +// } + +// public need( +// matched: Schema.StrictMatch, +// opts: { +// abortSignal?: AbortSignal; +// lifetimeAfterRto?: number; +// verifier?: Verifier; +// } = {}, +// ): Result { +// return new FetchResult(this.leafNode, matched.mapping, { +// ...this.pipelineOpts, +// ...opts, +// segmentPattern: this.segmentPattern, +// }); +// } +// }