Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/payload/src/queues/config/collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,5 @@ export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: Jo
jobLog: doc.log || [],
})
doc.input = doc.input || {}
doc.taskStatus = doc.taskStatus || {}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary, getJobTaskStatus does the same already

return doc
}
4 changes: 0 additions & 4 deletions packages/payload/src/queues/endpoints/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ export const runJobsEndpoint: Endpoint = {
silent,
}

if (typeof queue === 'string') {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary, it's added regardless

runJobsArgs.queue = queue
}

const parsedLimit = Number(limit)
if (!isNaN(parsedLimit)) {
runJobsArgs.limit = parsedLimit
Expand Down
20 changes: 6 additions & 14 deletions packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,22 @@ export function calculateBackoffWaitUntil({
retriesConfig: number | RetryConfig
totalTried: number
}): Date {
let waitUntil: Date = getCurrentDate()
const now = getCurrentDate()
let waitUntil: Date = now

if (typeof retriesConfig === 'object') {
if (retriesConfig.backoff) {
if (retriesConfig.backoff.type === 'fixed') {
waitUntil = retriesConfig.backoff.delay
? new Date(getCurrentDate().getTime() + retriesConfig.backoff.delay)
: getCurrentDate()
? new Date(now.getTime() + retriesConfig.backoff.delay)
: now
} else if (retriesConfig.backoff.type === 'exponential') {
// 2 ^ (attempts - 1) * delay (current attempt is not included in totalTried, thus no need for -1)
const delay = retriesConfig.backoff.delay ? retriesConfig.backoff.delay : 0
waitUntil = new Date(getCurrentDate().getTime() + Math.pow(2, totalTried) * delay)
waitUntil = new Date(now.getTime() + Math.pow(2, totalTried) * delay)
}
}
}

/*
const differenceInMSBetweenNowAndWaitUntil = waitUntil.getTime() - getCurrentDate().getTime()

const differenceInSBetweenNowAndWaitUntil = differenceInMSBetweenNowAndWaitUntil / 1000
console.log('Calculated backoff', {
differenceInMSBetweenNowAndWaitUntil,
differenceInSBetweenNowAndWaitUntil,
retriesConfig,
totalTried,
})*/
return waitUntil
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import type { PayloadRequest, Where } from '../../../types/index.js'
import type { TaskType } from '../../config/types/taskTypes.js'
import type { WorkflowTypes } from '../../config/types/workflowTypes.js'

import { jobsCollectionSlug } from '../../config/collection.js'

/**
* Gets all queued jobs that can be run. This means they either:
* - failed but do not have a definitive error => can be retried
Expand Down Expand Up @@ -63,7 +65,7 @@ export async function countRunnableOrActiveJobsForQueue({
}

const runnableOrActiveJobsForQueue = await req.payload.db.count({
collection: 'payload-jobs',
collection: jobsCollectionSlug,
req,
where: {
and,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const defaultAfterSchedule: AfterScheduleFn = async ({ jobStats, queueabl
},
},
},
updatedAt: new Date().toISOString(),
updatedAt: getCurrentDate().toISOString(),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCurrentDate() is required for current time mocking in our test suite to work

} as JobStats,
req,
returning: false,
Expand Down
25 changes: 10 additions & 15 deletions packages/payload/src/queues/operations/runJobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -303,27 +303,22 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
}

/**
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
*/
const { existingJobs, newJobs } = jobs.reduce(
(acc, job) => {
if (!silent || (typeof silent === 'object' && !silent.info)) {
let newCount = 0
let retryCount = 0

for (const job of jobs) {
if (job.totalTried > 0) {
acc.existingJobs.push(job)
retryCount++
} else {
acc.newJobs.push(job)
newCount++
}
return acc
},
{ existingJobs: [] as Job[], newJobs: [] as Job[] },
)
}

if (!silent || (typeof silent === 'object' && !silent.info)) {
payload.logger.info({
msg: `Running ${jobs.length} jobs.`,
new: newJobs?.length,
retrying: existingJobs?.length,
new: newCount,
retrying: retryCount,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ export const runJSONJob = async ({
return await runJSONJob({
job,
req,
silent,
updateJob,
workflowConfig,
workflowHandler,
Expand Down
3 changes: 2 additions & 1 deletion packages/payload/src/queues/utilities/updateJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Job } from '../../index.js'
import type { PayloadRequest, Sort, Where } from '../../types/index.js'

import { jobAfterRead, jobsCollectionSlug } from '../config/collection.js'
import { getCurrentDate } from './getCurrentDate.js'

type BaseArgs = {
data: Partial<Job>
Expand Down Expand Up @@ -85,7 +86,7 @@ export async function updateJobs({

if (typeof data.updatedAt === 'undefined') {
// Ensure updatedAt date is always updated
data.updatedAt = new Date().toISOString()
data.updatedAt = getCurrentDate().toISOString()
}

const args: UpdateJobsArgs = id
Expand Down
Loading