Skip to content

Commit

Permalink
feat: onFirst and onLast operators (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored Nov 14, 2024
2 parents 7145f7a + 7c566d2 commit 507ca15
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/shy-cobras-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

create `onFirst` and `onLast` operators
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,4 +567,57 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
}
});
}

/**
* Run the provided callback after the first atom passes through the stream.
*
* @param callback - Callback to run.
* @param [options = {}]
* @param [options.atom = true] - Run on any atom (true by default). Will only run after first
* `ok` atom if false.
*/
onFirst(callback: () => void, options: { atom?: boolean } = {}): Stream<T, E> {
this.trace("onFirst");

return this.consume(async function* (it) {
let first = false;

for await (const atom of it) {
yield atom;

if (!first && (options?.atom !== false || Stream.isOk(atom))) {
callback();
first = true;
}
}
});
}

/**
* Run the provided callback after the last atom passes through the stream.
*
* @param callback - Callback to run.
* @param [options = {}]
* @param [options.atom = true] - Run on any atom (true by default). Will only run after last
* `ok` atom if false.
*/
onLast(callback: () => void, options: { atom?: boolean } = {}): Stream<T, E> {
this.trace("onLast");

return this.consume(async function* (it) {
let emitted = false;

for await (const atom of it) {
yield atom;

if (options?.atom !== false || Stream.isOk(atom)) {
emitted = true;
}
}

if (emitted) {
callback();
}
});
}
}
209 changes: 208 additions & 1 deletion test/transforms.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { afterEach, beforeEach, describe, test, vi } from "vitest";
import { afterEach, beforeEach, describe, test, vi, type ExpectStatic } from "vitest";
import $ from "../src";

describe("stream transforms", () => {
Expand Down Expand Up @@ -558,4 +558,211 @@ describe("stream transforms", () => {
});
});
});

describe("onFirst", () => {
async function testAtom(
expect: ExpectStatic,
atom: unknown,
atomOption: boolean,
spyCalled: boolean,
) {
expect.assertions(2);

const spy = vi.fn();

const s = await $.from([atom])
.onFirst(spy, { atom: atomOption })
.toArray({ atoms: true });

expect(s, "stream should not be altered").toEqual([atom]);

if (spyCalled) {
expect(spy, "callback must be called").toHaveBeenCalledOnce();
} else {
expect(spy, "callback must not be called").not.toHaveBeenCalled();
}
}

describe("single item in stream", () => {
test("ok atom", async ({ expect }) => {
await testAtom(expect, $.ok(1), true, true);
});

test("error atom", async ({ expect }) => {
await testAtom(expect, $.error(1), true, true);
});

test("exception atom", async ({ expect }) => {
await testAtom(expect, $.exception(1, []), true, true);
});
});

test("multiple items in stream", async ({ expect }) => {
expect.assertions(2);

const spy = vi.fn();

const s = await $.from([$.error(1), $.ok(2), $.ok(3), $.exception(4, []), $.ok(5)])
.onFirst(spy)
.toArray({ atoms: true });

expect(s, "stream should not be altered").toEqual([
$.error(1),
$.ok(2),
$.ok(3),
$.exception(4, []),
$.ok(5),
]);
expect(spy, "callback must be called once").toHaveBeenCalledOnce();
});

test("no items in stream", async ({ expect }) => {
expect.assertions(2);

const spy = vi.fn();

const s = await $.from([]).onFirst(spy).toArray();

expect(s, "stream should not be altered").toEqual([]);
expect(spy, "callback must not be called").not.toHaveBeenCalled();
});

describe("with atom = false", () => {
describe("single item in stream", () => {
test("ok atom", async ({ expect }) => {
await testAtom(expect, $.ok(1), false, true);
});

test("error atom", async ({ expect }) => {
await testAtom(expect, $.error(1), false, false);
});

test("exception atom", async ({ expect }) => {
await testAtom(expect, $.exception(1, []), false, false);
});
});

test("error, ok", async ({ expect }) => {
expect.assertions(4);

const spy = vi.fn();

const s = $.from<number, number>([$.error(1), $.ok(2)])
.onFirst(spy, { atom: false })
[Symbol.asyncIterator]();

expect((await s.next()).value, "error should be emitted first").toEqual($.error(1));
expect(spy, "callback shouldn't be triggered on an error").not.toHaveBeenCalled();

expect((await s.next()).value, "ok value should be emitted next").toEqual($.ok(2));
expect(spy, "spy should only be called after the ok atom").toHaveBeenCalledOnce();
});
});
});

describe("onLast", () => {
async function testAtom(
expect: ExpectStatic,
atom: unknown,
atomOption: boolean,
spyCalled: boolean,
) {
expect.assertions(2);

const spy = vi.fn();

const s = await $.from([atom])
.onLast(spy, { atom: atomOption })
.toArray({ atoms: true });

expect(s, "stream should not be altered").toEqual([atom]);

if (spyCalled) {
expect(spy, "callback must be called").toHaveBeenCalledOnce();
} else {
expect(spy, "callback must not be called").not.toHaveBeenCalled();
}
}

describe("single item in stream", () => {
test("ok atom", async ({ expect }) => {
await testAtom(expect, $.ok(1), true, true);
});

test("error atom", async ({ expect }) => {
await testAtom(expect, $.error(1), true, true);
});

test("exception atom", async ({ expect }) => {
await testAtom(expect, $.exception(1, []), true, true);
});
});

test("multiple items in stream", async ({ expect }) => {
expect.assertions(2);

const spy = vi.fn();

const s = await $.from([$.error(1), $.ok(2), $.ok(3), $.exception(4, []), $.ok(5)])
.onLast(spy)
.toArray({ atoms: true });

expect(s, "stream should not be altered").toEqual([
$.error(1),
$.ok(2),
$.ok(3),
$.exception(4, []),
$.ok(5),
]);
expect(spy, "callback must be called once").toHaveBeenCalledOnce();
});

test("no items in stream", async ({ expect }) => {
expect.assertions(2);

const spy = vi.fn();

const s = await $.from([]).onLast(spy).toArray();

expect(s, "stream should not be altered").toEqual([]);
expect(spy, "callback must not be called").not.toHaveBeenCalled();
});

describe("with atom = false", () => {
describe("single item in stream", () => {
test("ok atom", async ({ expect }) => {
await testAtom(expect, $.ok(1), false, true);
});

test("error atom", async ({ expect }) => {
await testAtom(expect, $.error(1), false, false);
});

test("exception atom", async ({ expect }) => {
await testAtom(expect, $.exception(1, []), false, false);
});
});

test("error, ok", async ({ expect }) => {
expect.assertions(4);

const spy = vi.fn();

const s = $.from<number, number>([$.ok(1), $.error(2)])
.onLast(spy, { atom: false })
[Symbol.asyncIterator]();

expect((await s.next()).value, "ok value should be emitted first").toEqual($.ok(1));
expect(spy, "callback shouldn't be triggered on an error").not.toHaveBeenCalled();

expect((await s.next()).value, "error value should be emitted next").toEqual(
$.error(2),
);
expect(
spy,
"spy should only be called after the stream ends atom",
).toHaveBeenCalledOnce();
});
});
});
});

0 comments on commit 507ca15

Please sign in to comment.