Skip to content

Commit

Permalink
feat: implement fromCallback for node-style callbacks (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored May 7, 2024
2 parents 741e3de + d7a7c42 commit 5589c04
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/green-suns-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Implement `fromCallback` for stream creation
23 changes: 23 additions & 0 deletions src/stream/base.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { normalise, type Atom, type MaybeAtom, error, unknown } from "../atom";
import { Stream } from ".";
import { Readable, Writable } from "stream";
import { createNodeCallback } from "../util";

/**
* Marker for the end of a stream.
Expand Down Expand Up @@ -84,6 +85,28 @@ export class StreamBase {
throw new TypeError("expected a promise, (async) iterator, or (async) iterable");
}

/**
* Create a stream from a node-style callback. A node-compatible callback function will be
* passed as the first parameter to the callback of this function.
*
* The first parameter provided to the callback (the `error`) will be emitted as an `Error`
* atom, whilst the second parameter (the `value`) will be emitted as an `Ok` atom.
*
* @example
* $.fromCallback((next) => someAsyncMethod(paramA, paramB, next));
*
* @group Creation
*/
static fromCallback<T, E>(cb: (next: (error: E, value: T) => unknown) => void): Stream<T, E> {
// Set up a next function
const [promise, next] = createNodeCallback<T, E>();

// Run the callback
cb(next);

return StreamBase.fromPromise(promise);
}

/**
* Create a stream from a promise. The promise will be `await`ed, and the resulting value only
* ever emitted once.
Expand Down
33 changes: 32 additions & 1 deletion src/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Stream from ".";
import Stream, { type Atom } from ".";

/**
* Maybe it's a promise. Maybe it's not. Who's to say.
Expand Down Expand Up @@ -30,3 +30,34 @@ export async function exhaust(iterable: AsyncIterable<unknown>) {
}
}
}

/**
* Creates a `next` function and associated promise to promise-ify a node style callback. The
* `next` function must be passed as the callback to a function, and the resulting error or value
* will be emitted from the promise. The promise will always resolve.
*
* The error value of the callback (first parameter) will be emitted as an `Error` atom from the
* promise, whilst the value of the callback (second parameter) will be emitted as an `Ok` atom on
* the promise.
*/
export function createNodeCallback<T, E>(): [Promise<Atom<T, E>>, (error: E, value: T) => void] {
// Resolve function to be hoisted out of the promise
let resolve: (atom: Atom<T, E>) => void;

// Create the prom
const promise = new Promise<Atom<T, E>>((res) => {
resolve = res;
});

// Create the next callback
const next = (err: E, value: T) => {
if (err) {
resolve(Stream.error(err));
} else {
resolve(Stream.ok(value));
}
};

// Return a tuple of the promise and next function
return [promise, next];
}
39 changes: 39 additions & 0 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,43 @@ describe.concurrent("stream creation", () => {
expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});
});

describe.concurrent("from callback", () => {
/**
* Sample function that accepts a node-style callback.
*
* @param success - Whether the method should succeed or fail.
* @param cb - Node-style callback to pass error or value to.
*/
function someNodeCallback(
success: boolean,
cb: (error: string | undefined, value?: number) => void,
) {
if (success) {
cb(undefined, 123);
} else {
cb("an error");
}
}

test("value returned from callback", async ({ expect }) => {
expect.assertions(1);

const s = $.fromCallback((next) => {
someNodeCallback(true, next);
});

expect(await s.toArray({ atoms: true })).toEqual([$.ok(123)]);
});

test("error returned from callback", async ({ expect }) => {
expect.assertions(1);

const s = $.fromCallback((next) => {
someNodeCallback(false, next);
});

expect(await s.toArray({ atoms: true })).toEqual([$.error("an error")]);
});
});
});

0 comments on commit 5589c04

Please sign in to comment.