Skip to content

Commit

Permalink
feat: add the single consumption method (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored Feb 12, 2025
2 parents 9aff0d5 + 55dad68 commit 560ade7
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/tiny-tables-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

create `single` consumption method
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ export type {
// Re-export useful utility types
export type { MaybePromise, Truthy, CallbackOrStream, NodeCallback } from "./util";

export { WindpipeConsumptionError } from "./stream";

export default Stream;
47 changes: 47 additions & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ import { isOk, type Atom } from "../atom";
import { StreamBase } from "./base";
import { exhaust } from "../util";

export class WindpipeConsumptionError extends Error {
static noItems() {
return new WindpipeConsumptionError("no items found whilst consuming stream");
}
}

export class StreamConsumption<T, E> extends StreamBase {
/**
* Create an iterator that will emit each atom in the stream.
Expand Down Expand Up @@ -53,6 +59,47 @@ export class StreamConsumption<T, E> extends StreamBase {
};
}

/**
* Pull the stream once and remove the first item. This will not consume the rest of the
* stream.
*
* @note This method can only be called once on a given stream.
*/
single(options: { atom: true; optional: true }): Promise<Atom<T, E> | undefined>;
single(options: { atom: true; optional?: false }): Promise<Atom<T, E>>;
single(options: { atom?: false; optional: true }): Promise<T | undefined>;
single(options?: { atom?: false; optional?: false }): Promise<T>;
async single({
atom = false,
optional = false,
}: {
atom?: boolean;
optional?: boolean;
} = {}): Promise<T | Atom<T, E> | undefined> {
const it = this[Symbol.asyncIterator]();

const { value, done } = await it.next();

if (done) {
if (optional) {
// Fine to return undefined
return undefined;
}

throw WindpipeConsumptionError.noItems();
}

if (atom) {
return value;
}

if (isOk(value)) {
return value.value;
}

throw value.value;
}

/**
* Iterate through each atom in the stream, and return them as a single array.
*
Expand Down
1 change: 1 addition & 0 deletions src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
import { HigherOrderStream } from "./higher-order";

export type { StreamEnd } from "./base";
export { WindpipeConsumptionError } from "./consumption";

/**
* @template T - Type of the 'values' on the stream.
Expand Down
143 changes: 141 additions & 2 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test } from "vitest";
import $ from "../src";
import { describe, test, vi } from "vitest";
import $, { WindpipeConsumptionError } from "../src";
import { Readable } from "node:stream";

describe.concurrent("stream consumption", () => {
Expand Down Expand Up @@ -29,6 +29,145 @@ describe.concurrent("stream consumption", () => {
});
});

describe.concurrent("single", () => {
describe("single ok atom", () => {
test("with no params", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.ok(1));

expect(s.single()).resolves.toEqual(1);
});

test("with optional false", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.ok(1));

expect(s.single({ optional: false })).resolves.toEqual(1);
});

test("with optional true", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.ok(1));

expect(s.single({ optional: true })).resolves.toEqual(1);
});

test("with atom false", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.ok(1));

expect(s.single({ atom: false })).resolves.toEqual(1);
});

test("with atom true", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.ok(1));

expect(s.single({ atom: true })).resolves.toEqual($.ok(1));
});
});

describe("single error atom", () => {
test("with no params", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.error(1));

expect(s.single()).rejects.toEqual(1);
});

test("with optional false", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.error(1));

expect(s.single({ optional: false })).rejects.toEqual(1);
});

test("with optional true", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.error(1));

expect(s.single({ optional: true })).rejects.toEqual(1);
});

test("with atom false", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.error(1));

expect(s.single({ atom: false })).rejects.toEqual(1);
});

test("with atom true", async ({ expect }) => {
expect.assertions(1);

const s = $.of($.error(1));

expect(s.single({ atom: true })).resolves.toEqual($.error(1));
});
});

describe("empty stream", () => {
test("with no params", async ({ expect }) => {
expect.assertions(1);

const s = $.from([]);

expect(s.single()).rejects.toThrow(WindpipeConsumptionError);
});

test("with optional false", async ({ expect }) => {
expect.assertions(1);

const s = $.from([]);

expect(s.single({ optional: false })).rejects.toThrow(WindpipeConsumptionError);
});

test("with optional true", async ({ expect }) => {
expect.assertions(1);

const s = $.from([]);

expect(s.single({ optional: true })).resolves.toEqual(undefined);
});

test("with atom false", async ({ expect }) => {
expect.assertions(1);

const s = $.from([]);

expect(s.single({ atom: false })).rejects.toThrow(WindpipeConsumptionError);
});

test("with atom true", async ({ expect }) => {
expect.assertions(1);

const s = $.from([]);

expect(s.single({ atom: true })).rejects.toThrow(WindpipeConsumptionError);
});
});

test("single pull", async ({ expect }) => {
expect.assertions(2);

const fn = vi.fn().mockReturnValue(Promise.resolve(1));

const s = $.fromNext(fn);

expect(s.single()).resolves.toEqual(1);
expect(fn).toBeCalledTimes(1);
});
});

describe.concurrent("toArray", () => {
test("values", async ({ expect }) => {
expect.assertions(1);
Expand Down

0 comments on commit 560ade7

Please sign in to comment.