From aea890ce29610e71a2867d07cff5558db73d590d Mon Sep 17 00:00:00 2001 From: Aleksandr Grenishin Date: Mon, 1 May 2023 11:30:32 +0300 Subject: [PATCH] feat(upload-client): add `Queue` helper to make queued uploads (#481) * feat(upload-client): add `Queue` helper to make queued uploads * chore: fix lint warning * chore: refactor * chore: update readme --- packages/upload-client/README.md | 72 +++++++-- packages/upload-client/src/index.ts | 1 + packages/upload-client/src/tools/Queue.ts | 69 ++++++++ .../src/uploadFile/uploadFile.ts | 7 +- .../upload-client/test/tools/queue.test.ts | 151 ++++++++++++++++++ 5 files changed, 284 insertions(+), 16 deletions(-) create mode 100644 packages/upload-client/src/tools/Queue.ts create mode 100644 packages/upload-client/test/tools/queue.test.ts diff --git a/packages/upload-client/README.md b/packages/upload-client/README.md index 6d8f0e49a..487580486 100644 --- a/packages/upload-client/README.md +++ b/packages/upload-client/README.md @@ -23,6 +23,7 @@ Node.js and browser. - [High-Level API](#high-level-api) - [Low-Level API](#low-level-api) - [Settings](#settings) + - [Uploading queue](#uploading-queue) - [React Native](#react-native) - [Testing](#testing) - [Security issues](#security-issues) @@ -54,9 +55,7 @@ Once the UploadClient instance is created, you can start using the wrapper to upload files from binary data: ```javascript -client - .uploadFile(fileData) - .then(file => console.log(file.uuid)) +client.uploadFile(fileData).then((file) => console.log(file.uuid)) ``` Another option is uploading files from URL, via the `uploadFile` method: @@ -64,9 +63,7 @@ Another option is uploading files from URL, via the `uploadFile` method: ```javascript const fileURL = 'https://example.com/file.jpg' -client - .uploadFile(fileURL) - .then(file => console.log(file.uuid)) +client.uploadFile(fileURL).then((file) => console.log(file.uuid)) ``` You can also use the `uploadFile` method to get previously uploaded files via @@ -75,9 +72,7 @@ their UUIDs: ```javascript const fileUUID = 'edfdf045-34c0-4087-bbdd-e3834921f890' -client - .uploadFile(fileUUID) - .then(file => console.log(file.uuid)) +client.uploadFile(fileUUID).then((file) => console.log(file.uuid)) ``` You can track uploading progress: @@ -90,7 +85,7 @@ const onProgress = ({ isComputable, value }) => { client .uploadFile(fileUUID, { onProgress }) - .then(file => console.log(file.uuid)) + .then((file) => console.log(file.uuid)) ``` Note that `isComputable` flag can be `false` is some cases of uploading from the URL. @@ -105,8 +100,8 @@ const abortController = new AbortController() client .uploadFile(fileUUID, { signal: abortController.signal }) - .then(file => console.log(file.uuid)) - .catch(error => { + .then((file) => console.log(file.uuid)) + .catch((error) => { if (error.isCancel) { console.log(`File uploading was canceled.`) } @@ -194,8 +189,8 @@ const onProgress = ({ isComputable, value }) => console.log(isComputable, value) const abortController = new AbortController() base(fileData, { onProgress, signal: abortController.signal }) // fileData must be `Blob` or `File` or `Buffer` - .then(data => console.log(data.file)) - .catch(error => { + .then((data) => console.log(data.file)) + .catch((error) => { if (error.isCancel) { console.log(`File uploading was canceled.`) } @@ -421,6 +416,55 @@ Non-string values will be converted to `string`. `undefined` values will be igno See [docs][uc-file-metadata] and [REST API][uc-docs-metadata] for details. +### Uploading queue + +If you're going to upload a lot of files at once, it's useful to do it in a queue. Otherwise, a large number of simultaneous requests can clog the internet channel and slow down the process. + +To solve this problem, we provide a simple helper called `Queue`. + +Here is an example of how to use it: + +```typescript +import { Queue, uploadFile } from '@uploadcare/upload-client' + +// Create a queue with a limit of 10 concurrent requests. +const queue = new Queue(10) + +// Create an array containing 50 files. +const files = [ + ...Array(50) + .fill(0) + .map((_, idx) => Buffer.from(`content-${idx}`)) +] +const promises = files.map((file, idx) => { + const fileName = `file-${idx}.txt` + return queue + .add(() => + uploadFile(file, { + publicKey: 'YOUR_PUBLIC_KEY', + contentType: 'plain/text', + fileName + }) + ) + .then((fileInfo) => + console.log( + `"File "${fileName}" has been successfully uploaded! You can access it at the following URL: "${fileInfo.cdnUrl}"` + ) + ) +}) + +await Promise.all(promises) + +console.log('Files have been successfully uploaded') +``` + +You can pass any function that returns a promise to `queue.add`, and it will be executed concurrently. + +`queue.add` returns a promise that mimics the one passed in, meaning it will resolve or reject with the corresponding values. + +If the functionality of the built-in `Queue` is not sufficient for you, you can use any other third-party, more functional solution. + + ## React Native ### Prepare diff --git a/packages/upload-client/src/index.ts b/packages/upload-client/src/index.ts index 8288b42c7..37022b60a 100644 --- a/packages/upload-client/src/index.ts +++ b/packages/upload-client/src/index.ts @@ -77,6 +77,7 @@ export { CustomUserAgentOptions, GetUserAgentOptions } from '@uploadcare/api-client-utils' +export { Queue } from './tools/Queue' /* Types */ export { Headers, ErrorRequestInfo } from './request/types' diff --git a/packages/upload-client/src/tools/Queue.ts b/packages/upload-client/src/tools/Queue.ts new file mode 100644 index 000000000..6552ed803 --- /dev/null +++ b/packages/upload-client/src/tools/Queue.ts @@ -0,0 +1,69 @@ +type Task = () => Promise +type Resolver = (value: unknown) => void +type Rejector = (error: unknown) => void + +export class Queue { + #concurrency = 1 + #pending: Task[] = [] + #running = 0 + #resolvers: WeakMap = new WeakMap() + #rejectors: WeakMap = new WeakMap() + + constructor(concurrency: number) { + this.#concurrency = concurrency + } + + #run() { + const tasksLeft = this.#concurrency - this.#running + for (let i = 0; i < tasksLeft; i++) { + const task = this.#pending.shift() + if (!task) { + return + } + const resolver = this.#resolvers.get(task) + const rejector = this.#rejectors.get(task) + if (!resolver || !rejector) + throw new Error( + 'Unexpected behavior: resolver or rejector is undefined' + ) + this.#running += 1 + + task() + .finally(() => { + this.#resolvers.delete(task) + this.#rejectors.delete(task) + this.#running -= 1 + this.#run() + }) + .then((value) => resolver(value)) + .catch((error) => rejector(error)) + } + } + + add(task: Task): Promise { + return new Promise((resolve, reject) => { + this.#resolvers.set(task, resolve as Resolver) + this.#rejectors.set(task, reject as Rejector) + + this.#pending.push(task) + this.#run() + }) as Promise + } + + get pending() { + return this.#pending.length + } + + get running() { + return this.#running + } + + set concurrency(value: number) { + this.#concurrency = value + this.#run() + } + + get concurrency() { + return this.#concurrency + } +} diff --git a/packages/upload-client/src/uploadFile/uploadFile.ts b/packages/upload-client/src/uploadFile/uploadFile.ts index a56985d7e..95d6fcb5a 100644 --- a/packages/upload-client/src/uploadFile/uploadFile.ts +++ b/packages/upload-client/src/uploadFile/uploadFile.ts @@ -4,7 +4,11 @@ import { uploadFromUploaded } from './uploadFromUploaded' import { uploadFromUrl } from './uploadFromUrl' /* Types */ -import { CustomUserAgent, Metadata } from '@uploadcare/api-client-utils' +import { + CustomUserAgent, + Metadata, + StoreValue +} from '@uploadcare/api-client-utils' import { ProgressCallback, Url, Uuid } from '../api/types' import { getFileSize } from '../tools/getFileSize' import { isFileData } from '../tools/isFileData' @@ -13,7 +17,6 @@ import { UploadcareFile } from '../tools/UploadcareFile' import { SupportedFileInput } from '../types' import { isUrl, isUuid } from './types' import { uploadMultipart } from './uploadMultipart' -import { StoreValue } from '@uploadcare/api-client-utils' export type FileFromOptions = { publicKey: string diff --git a/packages/upload-client/test/tools/queue.test.ts b/packages/upload-client/test/tools/queue.test.ts new file mode 100644 index 000000000..8ed114c61 --- /dev/null +++ b/packages/upload-client/test/tools/queue.test.ts @@ -0,0 +1,151 @@ +import { delay } from '@uploadcare/api-client-utils' +import { Queue } from '../../src/tools/Queue' +import { expect } from '@jest/globals' + +const DELAY = 100 +const TIME_TOLERANCE = DELAY / 2 + +describe('Queue', () => { + describe('#add', () => { + it('should return promise resolved with the same value', async () => { + expect.assertions(1) + const queue = new Queue(1) + const promise = queue.add(() => Promise.resolve('result')) + await expect(promise).resolves.toBe('result') + }) + + it('should return promise rejected with the same error', async () => { + expect.assertions(1) + const queue = new Queue(1) + const promise = queue.add(() => Promise.reject('result')) + await expect(promise).rejects.toBe('result') + }) + }) + + it('should handle rejected promises', async () => { + expect.assertions(2) + const queue = new Queue(4) + const promises = [ + queue.add(() => Promise.resolve()), + queue.add(() => Promise.reject()), + queue.add(() => Promise.resolve()), + queue.add(() => Promise.reject()) + ] + expect(queue.running).toBe(4) + await Promise.allSettled(promises) + expect(queue.running).toBe(0) + }) + + it('should run tasks in LILO sequense', async () => { + expect.assertions(1) + const queue = new Queue(1) + const order: number[] = [] + const promises = [ + queue.add(() => Promise.resolve().then(() => order.push(1))), + queue.add(() => Promise.resolve().then(() => order.push(2))), + queue.add(() => Promise.resolve().then(() => order.push(3))) + ] + await Promise.all(promises) + await delay(0) + expect(order).toEqual([1, 2, 3]) + }) + + it('should run tasks concurrently', async () => { + expect.assertions(12) + const queue = new Queue(4) + const times: number[] = [] + const startTime = Date.now() + const promises = Array.from({ length: 12 }).map(() => { + return queue.add(() => + delay(DELAY).then(() => times.push(Date.now() - startTime)) + ) + }) + await Promise.all(promises) + + expect(Math.abs(times[0] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[1] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[2] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[3] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) + + expect(Math.abs(times[4] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[5] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[6] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[7] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) + + expect(Math.abs(times[8] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[9] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[10] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[11] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) + }) + + describe('get pending', () => { + it('should be able to get pending tasks count', async () => { + expect.assertions(3) + const queue = new Queue(2) + expect(queue.pending).toBe(0) + const promises = [ + queue.add(() => Promise.resolve()), + queue.add(() => Promise.resolve()), + queue.add(() => Promise.resolve()) + ] + // 2 task is running, 1 are pending + expect(queue.pending).toBe(1) + await Promise.all(promises) + expect(queue.pending).toBe(0) + }) + }) + + describe('get running', () => { + it('should be able to get running tasks count', async () => { + expect.assertions(3) + const queue = new Queue(2) + expect(queue.running).toBe(0) + const promises = [ + queue.add(() => Promise.resolve()), + queue.add(() => Promise.resolve()), + queue.add(() => Promise.resolve()) + ] + expect(queue.running).toBe(2) + await Promise.all(promises) + expect(queue.running).toBe(0) + }) + }) + + describe('get concurrency', () => { + it('should be able to get concurrency', async () => { + expect.assertions(1) + const queue = new Queue(2) + expect(queue.concurrency).toBe(2) + }) + }) + describe('set concurrency', () => { + it('should be able to change concurrency', async () => { + expect.assertions(9) + const queue = new Queue(1) + const times: number[] = [] + const startTime = Date.now() + const promises = Array.from({ length: 5 }).map(() => { + return queue.add(() => + delay(DELAY).then(() => times.push(Date.now() - startTime)) + ) + }) + await delay(0) + expect(queue.running).toBe(1) + await promises[0] + queue.concurrency = 2 + expect(queue.concurrency).toBe(2) + expect(queue.running).toBe(2) + await Promise.all(promises) + + expect(Math.abs(times[0] - DELAY * 1)).toBeLessThan(TIME_TOLERANCE) + + expect(Math.abs(times[1] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[2] - DELAY * 2)).toBeLessThan(TIME_TOLERANCE) + + expect(Math.abs(times[3] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) + expect(Math.abs(times[4] - DELAY * 3)).toBeLessThan(TIME_TOLERANCE) + + expect(queue.running).toBe(0) + }) + }) +})