-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(upload-client): add
Queue
helper to make queued uploads (#481)
* feat(upload-client): add `Queue` helper to make queued uploads * chore: fix lint warning * chore: refactor * chore: update readme
- Loading branch information
Showing
5 changed files
with
284 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
type Task<T = unknown> = () => Promise<T> | ||
type Resolver = (value: unknown) => void | ||
type Rejector = (error: unknown) => void | ||
|
||
export class Queue { | ||
#concurrency = 1 | ||
#pending: Task[] = [] | ||
#running = 0 | ||
#resolvers: WeakMap<Task, Resolver> = new WeakMap() | ||
#rejectors: WeakMap<Task, Rejector> = new WeakMap() | ||
|
||
constructor(concurrency: number) { | ||
this.#concurrency = concurrency | ||
} | ||
|
||
#run() { | ||
const tasksLeft = this.#concurrency - this.#running | ||
for (let i = 0; i < tasksLeft; i++) { | ||
const task = this.#pending.shift() | ||
if (!task) { | ||
return | ||
} | ||
const resolver = this.#resolvers.get(task) | ||
const rejector = this.#rejectors.get(task) | ||
if (!resolver || !rejector) | ||
throw new Error( | ||
'Unexpected behavior: resolver or rejector is undefined' | ||
) | ||
this.#running += 1 | ||
|
||
task() | ||
.finally(() => { | ||
this.#resolvers.delete(task) | ||
this.#rejectors.delete(task) | ||
this.#running -= 1 | ||
this.#run() | ||
}) | ||
.then((value) => resolver(value)) | ||
.catch((error) => rejector(error)) | ||
} | ||
} | ||
|
||
add<T>(task: Task<T>): Promise<T> { | ||
return new Promise((resolve, reject) => { | ||
this.#resolvers.set(task, resolve as Resolver) | ||
this.#rejectors.set(task, reject as Rejector) | ||
|
||
this.#pending.push(task) | ||
this.#run() | ||
}) as Promise<T> | ||
} | ||
|
||
get pending() { | ||
return this.#pending.length | ||
} | ||
|
||
get running() { | ||
return this.#running | ||
} | ||
|
||
set concurrency(value: number) { | ||
this.#concurrency = value | ||
this.#run() | ||
} | ||
|
||
get concurrency() { | ||
return this.#concurrency | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
import { delay } from '@uploadcare/api-client-utils' | ||
import { Queue } from '../../src/tools/Queue' | ||
import { expect } from '@jest/globals' | ||
|
||
const DELAY = 100 | ||
const TIME_TOLERANCE = DELAY / 2 | ||
|
||
describe('Queue', () => { | ||
describe('#add', () => { | ||
it('should return promise resolved with the same value', async () => { | ||
expect.assertions(1) | ||
const queue = new Queue(1) | ||
const promise = queue.add(() => Promise.resolve('result')) | ||
await expect(promise).resolves.toBe('result') | ||
}) | ||
|
||
it('should return promise rejected with the same error', async () => { | ||
expect.assertions(1) | ||
const queue = new Queue(1) | ||
const promise = queue.add(() => Promise.reject('result')) | ||
await expect(promise).rejects.toBe('result') | ||
}) | ||
}) | ||
|
||
it('should handle rejected promises', async () => { | ||
expect.assertions(2) | ||
const queue = new Queue(4) | ||
const promises = [ | ||
queue.add(() => Promise.resolve()), | ||
queue.add(() => Promise.reject()), | ||
queue.add(() => Promise.resolve()), | ||
queue.add(() => Promise.reject()) | ||
] | ||
expect(queue.running).toBe(4) | ||
await Promise.allSettled(promises) | ||
expect(queue.running).toBe(0) | ||
}) | ||
|
||
it('should run tasks in LILO sequense', async () => { | ||
expect.assertions(1) | ||
const queue = new Queue(1) | ||
const order: number[] = [] | ||
const promises = [ | ||
queue.add(() => Promise.resolve().then(() => order.push(1))), | ||
queue.add(() => Promise.resolve().then(() => order.push(2))), | ||
queue.add(() => Promise.resolve().then(() => order.push(3))) | ||
] | ||
await Promise.all(promises) | ||
await delay(0) | ||
expect(order).toEqual([1, 2, 3]) | ||
}) | ||
|
||
it('should run tasks concurrently', async () => { | ||
expect.assertions(12) | ||
const queue = new Queue(4) | ||
const times: number[] = [] | ||
const startTime = Date.now() | ||
const promises = Array.from({ length: 12 }).map(() => { | ||
return queue.add(() => | ||
delay(DELAY).then(() => times.push(Date.now() - startTime)) | ||
) | ||
}) | ||
await Promise.all(promises) | ||
|
||
expect(Math.abs(times[0] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[1] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[2] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[3] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) | ||
|
||
expect(Math.abs(times[4] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[5] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[6] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[7] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) | ||
|
||
expect(Math.abs(times[8] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[9] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[10] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[11] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) | ||
}) | ||
|
||
describe('get pending', () => { | ||
it('should be able to get pending tasks count', async () => { | ||
expect.assertions(3) | ||
const queue = new Queue(2) | ||
expect(queue.pending).toBe(0) | ||
const promises = [ | ||
queue.add(() => Promise.resolve()), | ||
queue.add(() => Promise.resolve()), | ||
queue.add(() => Promise.resolve()) | ||
] | ||
// 2 task is running, 1 are pending | ||
expect(queue.pending).toBe(1) | ||
await Promise.all(promises) | ||
expect(queue.pending).toBe(0) | ||
}) | ||
}) | ||
|
||
describe('get running', () => { | ||
it('should be able to get running tasks count', async () => { | ||
expect.assertions(3) | ||
const queue = new Queue(2) | ||
expect(queue.running).toBe(0) | ||
const promises = [ | ||
queue.add(() => Promise.resolve()), | ||
queue.add(() => Promise.resolve()), | ||
queue.add(() => Promise.resolve()) | ||
] | ||
expect(queue.running).toBe(2) | ||
await Promise.all(promises) | ||
expect(queue.running).toBe(0) | ||
}) | ||
}) | ||
|
||
describe('get concurrency', () => { | ||
it('should be able to get concurrency', async () => { | ||
expect.assertions(1) | ||
const queue = new Queue(2) | ||
expect(queue.concurrency).toBe(2) | ||
}) | ||
}) | ||
describe('set concurrency', () => { | ||
it('should be able to change concurrency', async () => { | ||
expect.assertions(9) | ||
const queue = new Queue(1) | ||
const times: number[] = [] | ||
const startTime = Date.now() | ||
const promises = Array.from({ length: 5 }).map(() => { | ||
return queue.add(() => | ||
delay(DELAY).then(() => times.push(Date.now() - startTime)) | ||
) | ||
}) | ||
await delay(0) | ||
expect(queue.running).toBe(1) | ||
await promises[0] | ||
queue.concurrency = 2 | ||
expect(queue.concurrency).toBe(2) | ||
expect(queue.running).toBe(2) | ||
await Promise.all(promises) | ||
|
||
expect(Math.abs(times[0] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) | ||
|
||
expect(Math.abs(times[1] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[2] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) | ||
|
||
expect(Math.abs(times[3] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) | ||
expect(Math.abs(times[4] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) | ||
|
||
expect(queue.running).toBe(0) | ||
}) | ||
}) | ||
}) |