Skip to content

Commit 18539a4

Browse files
committed
psync: pako => CompressionStream
1 parent 255a5fd commit 18539a4

File tree

8 files changed

+45
-31
lines changed

8 files changed

+45
-31
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
},
1616
"packageManager": "[email protected]+sha512.b2dc20e2fc72b3e18848459b37359a32064663e5627a51e4c74b2c29dd8e8e0491483c3abb40789cfd578bf362fb6ba8261b05f0387d76792ed6e23ea3b1b6a0",
1717
"devDependencies": {
18-
"@types/node": "^22.10.10",
18+
"@types/node": "^22.12.0",
1919
"@types/wtfnode": "^0.7.3",
20-
"@typescript/lib-dom": "npm:@types/[email protected].199",
20+
"@typescript/lib-dom": "npm:@types/[email protected].200",
2121
"@vitest/coverage-v8": "^3.0.4",
2222
"@yoursunny/xo-config": "0.60.0",
2323
"codedown": "^3.2.1",

pkg/psync/package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@
3535
"@ndn/util": "workspace:*",
3636
"@yoursunny/psync-bloom": "github:yoursunny/PSyncBloom-wasm#build",
3737
"murmurhash3js-revisited": "^3.0.0",
38-
"pako": "^2.1.0",
38+
"streaming-iterables": "^8.0.1",
3939
"tslib": "^2.8.1",
40+
"type-fest": "^4.33.0",
4041
"typescript-event-target": "^1.1.1"
4142
},
4243
"devDependencies": {
4344
"@ndn/fw": "workspace:^",
4445
"@ndn/l3face": "workspace:*",
45-
"@types/murmurhash3js-revisited": "^3.0.3",
46-
"@types/pako": "^2.0.3"
46+
"@types/murmurhash3js-revisited": "^3.0.3"
4747
}
4848
}

pkg/psync/src/codec.ts

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Component, type Name, TT } from "@ndn/packet";
22
import type { BloomFilter } from "@yoursunny/psync-bloom";
3+
import type { Promisable } from "type-fest";
34

45
import type { PSyncCore } from "./core";
56
import { IBLT } from "./iblt";
@@ -9,30 +10,30 @@ export class PSyncCodec {
910
Object.assign(this, p);
1011
}
1112

12-
public iblt2comp(iblt: IBLT): Component {
13-
return new Component(TT.GenericNameComponent, this.ibltCompression.compress(iblt.serialize()));
13+
public async iblt2comp(iblt: IBLT): Promise<Component> {
14+
return new Component(TT.GenericNameComponent, await this.ibltCompression.compress(iblt.serialize()));
1415
}
1516

16-
public comp2iblt(comp: Component): IBLT {
17+
public async comp2iblt(comp: Component): Promise<IBLT> {
1718
const iblt = new IBLT(this.ibltParams);
18-
iblt.deserialize(this.ibltCompression.decompress(comp.value));
19+
iblt.deserialize(await this.ibltCompression.decompress(comp.value));
1920
return iblt;
2021
}
2122

22-
public state2buffer(state: PSyncCore.State): Uint8Array {
23+
public async state2buffer(state: PSyncCore.State): Promise<Uint8Array> {
2324
return this.contentCompression.compress(this.encodeState(state));
2425
}
2526

26-
public buffer2state(buffer: Uint8Array): PSyncCore.State {
27-
return this.decodeState(this.contentCompression.decompress(buffer));
27+
public async buffer2state(buffer: Uint8Array): Promise<PSyncCore.State> {
28+
return this.decodeState(await this.contentCompression.decompress(buffer));
2829
}
2930
}
3031
export interface PSyncCodec extends Readonly<PSyncCodec.Parameters> {}
3132

3233
export namespace PSyncCodec {
3334
export interface Compression {
34-
compress: (input: Uint8Array) => Uint8Array;
35-
decompress: (compressed: Uint8Array) => Uint8Array;
35+
compress: (input: Uint8Array) => Promisable<Uint8Array>;
36+
decompress: (compressed: Uint8Array) => Promisable<Uint8Array>;
3637
}
3738

3839
export interface Parameters {

pkg/psync/src/full.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ export class FullSync extends TypedEventTarget<EventMap> implements SyncProtocol
143143
return undefined;
144144
}
145145

146-
const recvIblt = this.codec.comp2iblt(ibltComp);
146+
const recvIblt = await this.codec.comp2iblt(ibltComp);
147147
const { success, positive, negative, total } = this.c.iblt.diff(recvIblt);
148148
if (!success && (total >= this.c.threshold || total === 0)) {
149149
const state = this.c.list(({ seqNum }) => seqNum > 0);
@@ -195,13 +195,15 @@ export class FullSync extends TypedEventTarget<EventMap> implements SyncProtocol
195195
}
196196
};
197197

198-
private async sendSyncData(interest: Interest, state: PSyncCore.State, action: string, recvIblt: IBLT): Promise<Data | undefined> {
198+
private async sendSyncData(
199+
interest: Interest, state: PSyncCore.State, action: string, recvIblt: IBLT,
200+
): Promise<Data | undefined> {
199201
this.debug(action, recvIblt, state);
200202
if (this.cCurrentInterestName?.equals(interest.name)) {
201203
this.scheduleSyncInterest(0);
202204
}
203205

204-
const server = this.pBuffer.add(interest.name, state, this.pFreshness);
206+
const server = await this.pBuffer.add(interest.name, state, this.pFreshness);
205207
return server.processInterest(interest);
206208
}
207209

@@ -220,7 +222,7 @@ export class FullSync extends TypedEventTarget<EventMap> implements SyncProtocol
220222

221223
const abort = new AbortController();
222224
this.cAbort = abort;
223-
const ibltComp = this.codec.iblt2comp(this.c.iblt);
225+
const ibltComp = await this.codec.iblt2comp(this.c.iblt);
224226
const name = this.syncPrefix.append(ibltComp, GenericNumber.create(this.c.sumSeqNum));
225227
this.cCurrentInterestName = name;
226228
this.debug("c-request");

pkg/psync/src/param-zlib.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
1-
import { deflate, inflate } from "pako";
1+
import { concatBuffers } from "@ndn/util";
2+
import { collect } from "streaming-iterables";
23

34
import type { PSyncCodec } from "./codec";
45

56
/** Use zlib compression with PSync. */
67
export const PSyncZlib: PSyncCodec.Compression = {
7-
compress(input) {
8-
return deflate(input, { level: 9 });
8+
async compress(input) {
9+
return doTransform(input, new CompressionStream("deflate"));
910
},
10-
decompress(compressed) {
11-
return inflate(compressed);
11+
async decompress(compressed) {
12+
return doTransform(compressed, new DecompressionStream("deflate"));
1213
},
1314
};
15+
16+
async function doTransform(input: Uint8Array, tr: TransformStream<Uint8Array, Uint8Array>): Promise<Uint8Array> {
17+
const chunks = await collect(
18+
new Blob([input]).stream()
19+
.pipeThrough(tr) as unknown as AsyncIterable<Uint8Array>,
20+
);
21+
return concatBuffers(chunks);
22+
}

pkg/psync/src/partial-publisher.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,12 +137,12 @@ export class PartialPublisher extends TypedEventTarget<EventMap> implements Sync
137137
}
138138

139139
const ibltComp = interest.name.at(-1);
140-
const recvIblt = this.codec.comp2iblt(ibltComp);
140+
const recvIblt = await this.codec.comp2iblt(ibltComp);
141141

142142
const { success, positive, total } = this.c.iblt.diff(recvIblt);
143143
if (!success) {
144144
// TODO publish ContentType=Nack via StateProducerBuffer
145-
const ibltComp = this.codec.iblt2comp(this.c.iblt);
145+
const ibltComp = await this.codec.iblt2comp(this.c.iblt);
146146
const name = interest.name.append(ibltComp, Segment.create(0));
147147
return new Data(name, Data.ContentType(0x03), Data.FreshnessPeriod(this.sFreshness), Data.FinalBlock);
148148
}
@@ -205,12 +205,14 @@ export class PartialPublisher extends TypedEventTarget<EventMap> implements Sync
205205
}
206206
};
207207

208-
private sendStateData(interest: Interest, state: PSyncCore.State, action: string, freshness: number): Promise<Data | undefined> {
209-
const ibltComp = this.codec.iblt2comp(this.c.iblt);
208+
private async sendStateData(
209+
interest: Interest, state: PSyncCore.State, action: string, freshness: number,
210+
): Promise<Data | undefined> {
211+
const ibltComp = await this.codec.iblt2comp(this.c.iblt);
210212
const name = interest.name.append(ibltComp);
211213

212214
this.debug(action, interest);
213-
const server = this.pBuffer.add(name, state, freshness);
215+
const server = await this.pBuffer.add(name, state, freshness);
214216
return server.processInterest(interest);
215217
}
216218
}

pkg/psync/src/state-fetcher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class StateFetcher {
4848
describe: `${this.describe}[${describeSuffix}f]`,
4949
signal,
5050
});
51-
const state = this.codec.buffer2state(payload);
51+
const state = await this.codec.buffer2state(payload);
5252
return { versioned, state };
5353
}
5454
}

pkg/psync/src/state-producer-buffer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ export class StateProducerBuffer {
3030
}
3131
}
3232

33-
public add(name: Name, state: PSyncCore.State, freshnessPeriod: number): Server {
34-
const source = new BufferChunkSource(this.codec.state2buffer(state));
33+
public async add(name: Name, state: PSyncCore.State, freshnessPeriod: number): Promise<Server> {
34+
const source = new BufferChunkSource(await this.codec.state2buffer(state));
3535
const server = serveVersioned(name, source, {
3636
freshnessPeriod,
3737
pOpts: this.pOpts,

0 commit comments

Comments
 (0)