Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TypeScript port #4

Open
ssured opened this issue Sep 4, 2018 · 1 comment
Open

TypeScript port #4

ssured opened this issue Sep 4, 2018 · 1 comment

Comments

@ssured
Copy link

ssured commented Sep 4, 2018

Your lib is awesome! Because I use TypeScript, I ported the code to TypeScript; I'm not an expert but it works pretty nice. Maybe you want to support TypeScript?

// code based on JS version at https://github.com/tannerlinsley/swimmer/blob/5c4fceb83e233bbc8850fb55337526c07ec3a18a/src/index.js
type TThunk<T> = () => Promise<T>;

type TPoolConfig<T> = {
    concurrency: number;
    started: boolean;
    tasks: TThunk<T>[];
};

type TSettledCb = () => void;
type TErrorCb<T, E> = (error: E, task: TThunk<T>) => void;
type TSuccessCb<T> = (result: T, task: TThunk<T>) => void;

const defaultConfig: TPoolConfig<any> = {
    concurrency: 5,
    started: true,
    tasks: [],
};

const resolves = new WeakMap<TThunk<any>, Function>();
const rejects = new WeakMap<TThunk<any>, Function>();

export function createPool<T = any, E = any>(config: Partial<TPoolConfig<T>> = defaultConfig) {
    const { concurrency, started, tasks } = {
        ...defaultConfig,
        ...config,
    };

    let onSettles: TSettledCb[] = [];
    let onErrors: TErrorCb<T, E>[] = [];
    let onSuccesses: TSuccessCb<T>[] = [];

    let running: boolean = started;
    let active: TThunk<T>[] = [];
    let pending: TThunk<T>[] = tasks;
    let currentConcurrency = concurrency;

    const tick = () => {
        if (!running) {
            return;
        }
        if (!pending.length && !active.length) {
            onSettles.forEach(d => d());
            return;
        }
        while (active.length < currentConcurrency && pending.length) {
            const nextFn = pending.shift()!;
            active.push(nextFn);
            /* eslint-disable no-loop-func */
            (async () => {
                let success = false;
                let res: any;
                let error: any;
                try {
                    res = await nextFn();
                    success = true;
                } catch (e) {
                    error = e;
                }
                active = active.filter(d => d !== nextFn);
                if (success) {
                    resolves.get(nextFn)!(res);
                    onSuccesses.forEach(d => d(res, nextFn));
                } else {
                    rejects.get(nextFn)!(error);
                    onErrors.forEach(d => d(error, nextFn));
                }
                tick();
            })();
            /* eslint-enable no-loop-func */
        }
    };

    const api = {
        add: (fn: TThunk<T>, { priority }: { priority?: boolean } = {}) =>
            new Promise((resolve, reject) => {
                if (priority) {
                    pending.unshift(fn);
                } else {
                    pending.push(fn);
                }
                resolves.set(fn, resolve);
                rejects.set(fn, reject);
                tick();
            }),
        throttle: (n: number) => {
            currentConcurrency = n;
        },
        onSettled: (cb: TSettledCb) => {
            onSettles.push(cb);
            return () => {
                onSettles = onSettles.filter(d => d !== cb);
            };
        },
        onError: (cb: TErrorCb<T, E>) => {
            onErrors.push(cb);
            return () => {
                onErrors = onErrors.filter(d => d !== cb);
            };
        },
        onSuccess: (cb: TSuccessCb<T>) => {
            onSuccesses.push(cb);
            return () => {
                onSuccesses = onSuccesses.filter(d => d !== cb);
            };
        },
        stop: () => {
            running = false;
        },
        start: () => {
            running = true;
            tick();
        },
        clear: () => {
            pending = [];
        },
        getActive: () => active,
        getPending: () => pending,
        getAll: () => [...active, ...pending],
        isRunning: () => running,
        isSettled: () => !running && !active.length && !pending.length,
    };

    return api;
}

export function poolAll<T>(tasks: TThunk<T>[], concurrency: number) {
    return new Promise((resolve, reject) => {
        const pool = createPool({
            concurrency,
        });
        const results: T[] = [];
        pool.onSettled(() => {
            resolve(results);
        });
        pool.onError(err => {
            reject(err);
        });
        tasks.forEach((task, i) => {
            pool.add(async () => {
                const res = await task();
                results[i] = res;
                return res;
            });
        });
        pool.start();
    });
}
@ssured
Copy link
Author

ssured commented Sep 4, 2018

Oh, one thing; I added a WeakMap to replace the .resolve and .reject props you added on the thunk. It needs a compatible browser (evergreen)

@ssured ssured changed the title Typescript port TypeScript port Sep 4, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant