Skip to content

Commit

Permalink
feat: fromPusher stream creation method (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored Nov 19, 2024
2 parents c3efc27 + 035cf6a commit b9645f0
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/fifty-grapes-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

add `fromPusher` stream creation method
71 changes: 70 additions & 1 deletion src/stream/base.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { normalise, type Atom, type MaybeAtom, error, exception } from "../atom";
import { Stream } from ".";
import { Readable, Writable } from "stream";
import { createNodeCallback } from "../util";
import { createNodeCallback, newSignal } from "../util";

/**
* Unique type to represent the stream end marker.
Expand Down Expand Up @@ -222,6 +222,75 @@ export class StreamBase {
);
}

/**
* Create a new stream, and use the provided `push` and `done` methods to add values to it, and
* complete the stream.
*
* - `push`: Adds the provided value to the stream.
* - `done`: Indicatest that the stream is done, meaning that any future calls to `push` or
* `done` will be ignored.
*
* @group Creation
*/
static fromPusher<T, E>(): {
stream: Stream<T, E>;
push: (value: MaybeAtom<T, E>) => void;
done: () => void;
} {
// Queue of atoms waiting to be pushed.
const queue: MaybeAtom<T, E>[] = [];

// Flag to indicate when the `done` method is called.
let done = false;

// Signal to indicate when some action has taken place.
let signal = newSignal();

async function next(retry = 10) {
// If there's something waiting in the queue, immediately produce it.
if (queue.length > 0) {
return queue.shift()!;
}

// If the stream is complete, immediately return.
if (done) {
return Stream.StreamEnd;
}

// Prepare a new signal, and wait for it.
signal = newSignal();
await signal;

// Protection incase something goes whack with the signal, shouldn't ever be
// encountered.
if (retry === 0) {
console.warn("[windpipe] recursion limit hit whilst waiting for pushed value");

return Stream.StreamEnd;
}

// Recurse and try again.
return next(retry - 1);
}

return {
stream: Stream.fromNext<T, E>(next),
push: (value: MaybeAtom<T, E>) => {
if (done) {
console.error("[windpipe] cannot push after stream is complete");
return;
}

queue.push(value);
signal.done();
},
done: () => {
done = true;
signal.done();
},
};
}

/**
* Create a new stream containing a single value. Unless an atom is provided, it will be
* converted to an `ok` atom.
Expand Down
166 changes: 164 additions & 2 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { describe, test } from "vitest";
import { afterEach, beforeEach, describe, test, vi } from "vitest";
import $ from "../src";
import { Readable } from "stream";

describe.concurrent("stream creation", () => {
describe("stream creation", () => {
describe.concurrent("from promise", () => {
test("resolving promise to emit value", async ({ expect }) => {
expect.assertions(1);
Expand Down Expand Up @@ -197,4 +197,166 @@ describe.concurrent("stream creation", () => {
]);
});
});

describe("fromPusher", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});

test("push single value", async ({ expect }) => {
expect.assertions(1);

const { stream, push, done } = $.fromPusher();

// Push a value, then immediately complete
push(1);
done();

expect(await stream.toArray({ atoms: true })).toEqual([$.ok(1)]);
});

test("push single value, delayed done", async ({ expect }) => {
expect.assertions(1);

const { stream, push, done } = $.fromPusher();

const streamPromise = stream.toArray({ atoms: true });

// Push a value
push(1);

// Complete at a later point
setImmediate(() => done());

await vi.runAllTimersAsync();

expect(await streamPromise).toEqual([$.ok(1)]);
});

test("push single value, not done", async ({ expect }) => {
expect.assertions(2);

const { stream, push } = $.fromPusher();

// Push a value.
push(1);

const spy = vi.fn();

// Ensure the stream never completes.
stream
.tap(spy)
.exhaust()
.then(() => expect.fail("promise must not resolve"));

const WAIT_TIME = 10_000;

// Block the test until the microtask queue is empty, to make sure there's nothing
// else coming down the stream.
setTimeout(() => {
expect(spy).toBeCalledTimes(1);
expect(spy).toBeCalledWith(1);
}, WAIT_TIME);

await vi.advanceTimersByTimeAsync(WAIT_TIME);
});

test("multiple values", async ({ expect }) => {
expect.assertions(1);

const { stream, push, done } = $.fromPusher();

push(1);
push(2);
push(3);
push(4);
done();

expect(await stream.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(2),
$.ok(3),
$.ok(4),
]);
});

test("multiple atoms", async ({ expect }) => {
expect.assertions(1);

const { stream, push, done } = $.fromPusher();

push($.ok(1));
push($.ok(2));
push($.error(3));
push($.exception(4, []));
done();

expect(await stream.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(2),
$.error(3),
$.exception(4, []),
]);
});

test("no items pushed", async ({ expect }) => {
expect.assertions(1);

const { stream, done } = $.fromPusher();

done();

expect(await stream.toArray({ atoms: true })).toEqual([]);
});

test("push items with delay", async ({ expect }) => {
expect.assertions(9);

const spy = vi.fn();

const { stream, push, done } = $.fromPusher();

const streamPromise = stream.map(spy).exhaust();

// Some synchronous values
push($.ok(1));
push($.ok(2));

await vi.runAllTimersAsync();

// Some timeout values
setTimeout(() => push($.ok(3)), 1000);
setTimeout(() => push($.ok(4)), 2000);
setTimeout(() => push($.ok(5)), 3000);

// Finish the stream
setTimeout(() => done(), 4000);

// Initial assertions
expect(spy).toHaveBeenCalledTimes(2);
expect(spy).toHaveBeenNthCalledWith(1, 1);
expect(spy).toHaveBeenNthCalledWith(2, 2);

// Async assertions
await vi.advanceTimersByTimeAsync(1000);
expect(spy).toHaveBeenCalledTimes(3);
expect(spy).toHaveBeenNthCalledWith(3, 3);

await vi.advanceTimersByTimeAsync(1000);
expect(spy).toHaveBeenCalledTimes(4);
expect(spy).toHaveBeenNthCalledWith(4, 4);

await vi.advanceTimersByTimeAsync(1000);
expect(spy).toHaveBeenCalledTimes(5);
expect(spy).toHaveBeenNthCalledWith(5, 5);

// Run everything else thorugh
await vi.runAllTimersAsync();

await streamPromise;
});
});
});

0 comments on commit b9645f0

Please sign in to comment.