Skip to content

Commit 5e999c8

Browse files
authored
Use p-map to speed up bulk actions (#2272)
1 parent 569cecb commit 5e999c8

File tree

4 files changed

+53
-40
lines changed

4 files changed

+53
-40
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,7 @@ const EnvironmentSchema = z.object({
973973
// Bulk action
974974
BULK_ACTION_BATCH_SIZE: z.coerce.number().int().default(100),
975975
BULK_ACTION_BATCH_DELAY_MS: z.coerce.number().int().default(200),
976+
BULK_ACTION_SUBBATCH_CONCURRENCY: z.coerce.number().int().default(5),
976977
});
977978

978979
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts

Lines changed: 48 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { timeFilters } from "~/components/runs/v3/SharedFilters";
2424
import parseDuration from "parse-duration";
2525
import { v3BulkActionPath } from "~/utils/pathBuilder";
2626
import { formatDateTime } from "~/components/primitives/DateTime";
27+
import pMap from "p-map";
2728

2829
export class BulkActionService extends BaseService {
2930
public async create(
@@ -191,29 +192,33 @@ export class BulkActionService extends BaseService {
191192
},
192193
});
193194

194-
for (const run of runs) {
195-
const [error, result] = await tryCatch(
196-
cancelService.call(run, {
197-
reason: `Bulk action ${group.friendlyId} cancelled run`,
198-
bulkActionId: bulkActionId,
199-
})
200-
);
201-
if (error) {
202-
logger.error("Failed to cancel run", {
203-
error,
204-
runId: run.id,
205-
status: run.status,
206-
});
195+
await pMap(
196+
runs,
197+
async (run) => {
198+
const [error, result] = await tryCatch(
199+
cancelService.call(run, {
200+
reason: `Bulk action ${group.friendlyId} cancelled run`,
201+
bulkActionId: bulkActionId,
202+
})
203+
);
204+
if (error) {
205+
logger.error("Failed to cancel run", {
206+
error,
207+
runId: run.id,
208+
status: run.status,
209+
});
207210

208-
failureCount++;
209-
} else {
210-
if (!result || result.alreadyFinished) {
211211
failureCount++;
212212
} else {
213-
successCount++;
213+
if (!result || result.alreadyFinished) {
214+
failureCount++;
215+
} else {
216+
successCount++;
217+
}
214218
}
215-
}
216-
}
219+
},
220+
{ concurrency: env.BULK_ACTION_SUBBATCH_CONCURRENCY }
221+
);
217222

218223
break;
219224
}
@@ -228,33 +233,37 @@ export class BulkActionService extends BaseService {
228233
},
229234
});
230235

231-
for (const run of runs) {
232-
const [error, result] = await tryCatch(
233-
replayService.call(run, {
234-
bulkActionId: bulkActionId,
235-
})
236-
);
237-
if (error) {
238-
logger.error("Failed to replay run, error", {
239-
error,
240-
runId: run.id,
241-
status: run.status,
242-
});
243-
244-
failureCount++;
245-
} else {
246-
if (!result) {
247-
logger.error("Failed to replay run, no result", {
236+
await pMap(
237+
runs,
238+
async (run) => {
239+
const [error, result] = await tryCatch(
240+
replayService.call(run, {
241+
bulkActionId: bulkActionId,
242+
})
243+
);
244+
if (error) {
245+
logger.error("Failed to replay run, error", {
246+
error,
248247
runId: run.id,
249248
status: run.status,
250249
});
251250

252251
failureCount++;
253252
} else {
254-
successCount++;
253+
if (!result) {
254+
logger.error("Failed to replay run, no result", {
255+
runId: run.id,
256+
status: run.status,
257+
});
258+
259+
failureCount++;
260+
} else {
261+
successCount++;
262+
}
255263
}
256-
}
257-
}
264+
},
265+
{ concurrency: env.BULK_ACTION_SUBBATCH_CONCURRENCY }
266+
);
258267
break;
259268
}
260269
}

apps/webapp/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@
156156
"ohash": "^1.1.3",
157157
"openai": "^4.33.1",
158158
"p-limit": "^6.2.0",
159+
"p-map": "^6.0.0",
159160
"parse-duration": "^2.1.0",
160161
"posthog-js": "^1.93.3",
161162
"posthog-node": "4.17.1",

pnpm-lock.yaml

Lines changed: 3 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)