Skip to content

Commit

Permalink
chore: wip
Browse files Browse the repository at this point in the history
  • Loading branch information
glennmichael123 committed Jan 8, 2025
1 parent ca0d243 commit 544b5af
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 35 deletions.
83 changes: 52 additions & 31 deletions storage/framework/core/queue/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,57 +100,78 @@ export class Queue implements Dispatchable {
async dispatch(): Promise<void> {
const queueName = this.options.queue || 'default'

if (['database', 'redis'].includes(queueDriver)) {
await storeJob(this.name, {
queue: queueName,
payload: this.payload,
context: this.options.context,
maxTries: this.options.maxTries,
timeout: this.options.timeout,
backoff: this.options.backoff,
delay: this.options.delay
})
const jobPayload = this.createJobPayload(queueName)

process.exit(0)
if (this.isQueuedDriver()) {
await this.storeQueuedJob(jobPayload)
return
}

if (this.options.afterResponse) {
process.on('beforeExit', async () => {
await this.dispatchNow()
})
this.deferAfterResponse()
return
}

if (this.options.delay) {
setTimeout(async () => {
await this.dispatchNow()
}, this.options.delay * 1000)
this.deferWithDelay()
return
}

try {
await this.runJobImmediately(jobPayload)
}

await runJob(this.name, {
queue: queueName,
payload: this.payload,
context: this.options.context,
maxTries: this.options.maxTries,
timeout: this.options.timeout,
backoff: this.options.backoff,
})
private isQueuedDriver(): boolean {
return ['database', 'redis'].includes(queueDriver)
}

if (this.options.chainedJobs?.length) {
for (const job of this.options.chainedJobs) {
await job.dispatch()
}
}
private createJobPayload(queueName: string): any {
return {
queue: queueName,
payload: this.payload,
context: this.options.context,
maxTries: this.options.maxTries,
timeout: this.options.timeout,
backoff: this.options.backoff,
delay: this.options.delay,
}
}

private async storeQueuedJob(jobPayload: any): Promise<void> {
await storeJob(this.name, jobPayload)
}

private deferAfterResponse(): void {
process.on('beforeExit', async () => {
await this.dispatchNow()
})
}

private deferWithDelay(): void {
setTimeout(async () => {
await this.dispatchNow()
}, this.options.delay * 1000)
}

private async runJobImmediately(jobPayload: any): Promise<void> {
try {
await runJob(this.name, jobPayload)
await this.runChainedJobs()
}
catch (error) {
log.error(`Failed to dispatch job ${this.name}:`, error)
throw error
}
}

private async runChainedJobs(): Promise<void> {
if (!this.options.chainedJobs?.length)
return

for (const job of this.options.chainedJobs) {
await job.dispatch()
}
}

async dispatchNow(): Promise<void> {
try {
await runJob(this.name, {
Expand Down
13 changes: 10 additions & 3 deletions storage/framework/core/queue/src/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@ interface QueuePayload {
}

export async function processJobs(): Promise<Ok<string, never>> {
setInterval( async () => {
setInterval(async () => {
await executeJobs()
}, 1000)


return ok('All jobs processed successfully!')
}

async function executeJobs() {
async function executeJobs(): Promise<void> {
const jobs = await Job.all()

for (const job of jobs) {
if (job.payload) {
if (job.available_at && job.available_at > timestampNow())
return

const payload: QueuePayload = JSON.parse(job.payload)
const currentAttempts = job.attempts || 0
log.info(`Running ${payload.displayName}`)
Expand All @@ -51,3 +53,8 @@ async function executeJobs() {
}
}
}

function timestampNow(): number {
const now = Date.now()
return Math.floor(now / 1000)
}
2 changes: 1 addition & 1 deletion storage/framework/core/queue/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export async function storeJob(name: string, options: QueueOption): Promise<void
queue: options.queue,
payload: payloadJson,
attempts: 0,
available_at: generateUnixTimestamp(options.delay || 0)
available_at: generateUnixTimestamp(options.delay || 0),
}

await Job.create(job)
Expand Down

0 comments on commit 544b5af

Please sign in to comment.