-
Notifications
You must be signed in to change notification settings - Fork 118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: improve pg write queue #1878
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1343,14 +1343,26 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities { | |
*/ | ||
export class PgWriteQueue { | ||
readonly queue: PQueue; | ||
private tasks: Promise<unknown>[]; | ||
constructor() { | ||
const concurrency = Math.max(1, getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4)); | ||
this.queue = new PQueue({ concurrency, autoStart: true }); | ||
this.tasks = []; | ||
} | ||
enqueue(task: Parameters<PQueue['add']>[0]): void { | ||
void this.queue.add(task); | ||
const p = this.queue.add(task); | ||
p.catch(e => logger.error(e, 'PgWriteQueue task failed')); | ||
this.tasks.push(p); | ||
} | ||
done(): Promise<void> { | ||
return this.queue.onIdle(); | ||
async done(): Promise<void> { | ||
// https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't believe the problem described in this post applies to the tasks handled by this queue for a couple of reasons:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What I'm seeing is, there was an unhandledRejection and then the process restarted. |
||
const results = await Promise.allSettled(this.tasks); | ||
this.tasks = []; | ||
const firstRejected = results.find(v => v.status === 'rejected') as | ||
| PromiseRejectedResult | ||
| undefined; | ||
if (firstRejected != null) { | ||
throw firstRejected.reason; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this catch needed, or would it be handled later in the
throw firstRejected.reason
code?If the error is logged here, it would have no meaningful stack trace compared to the below code which preserves the async call stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no catch, the process will be terminated by an unhandledRejection.
The error is logged here because
throw firstRejected.reason
only throws the first one, we don't have any log for the other errors.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah okay, would it make sense to use the p-queue error event instead? https://www.npmjs.com/package/p-queue#error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what we saw using a node built from this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we can manually capture the errors ourself with
.catch
to prevent aterminated by an unhandledRejection
. But could we instead use the build-in p-queue errror handler, e.g.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not available before v7.0.0: https://github.com/sindresorhus/p-queue/tree/v6.6.2?tab=readme-ov-file#events