Skip to content

Commit

Permalink
fix: ensure (n)ack is send on same channel
Browse files Browse the repository at this point in the history
  • Loading branch information
tada5hi committed Jun 11, 2024
1 parent 906ed81 commit ab7026f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 9 deletions.
5 changes: 2 additions & 3 deletions src/consume/type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@
* view the LICENSE file that was distributed with this source code.
*/

import type { ChannelWrapper } from 'amqp-connection-manager';
import type { ConsumeMessage, Options } from 'amqplib';
import type { Channel, ConsumeMessage, Options } from 'amqplib';
import type { ConsumeHandlerAnyKey } from './constants';

export {
ConsumeMessage,
};

export type ConsumeMessageHandler = (message: ConsumeMessage, channel: ChannelWrapper) => Promise<void> | void;
export type ConsumeMessageHandler = (message: ConsumeMessage, channel: Channel) => Promise<void> | void;

export type ConsumeHandlers = {
[ConsumeHandlerAnyKey]?: ConsumeMessageHandler,
Expand Down
15 changes: 9 additions & 6 deletions src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ export class Client {
}

const requeueOnFailure = options.requeueOnFailure ?? false;
const handleMessage = async (message: ConsumeMessage | null) => {
const handleMessage = async (
message: ConsumeMessage | null,
ch: Channel,
) => {
if (!message) {
return;
}
Expand All @@ -109,16 +112,16 @@ export class Client {
handlers[ConsumeHandlerAnyKey];

if (typeof handler === 'undefined') {
channel.nack(message, undefined, requeueOnFailure);
ch.nack(message, undefined, requeueOnFailure);
return;
}

try {
await handler(message, channel);
await handler(message, ch);

channel.ack(message);
ch.ack(message);
} catch (e) {
channel.nack(message, undefined, requeueOnFailure);
ch.nack(message, undefined, requeueOnFailure);
}
};

Expand Down Expand Up @@ -164,7 +167,7 @@ export class Client {

await channel.consume(
queueName,
(message) => handleMessage(message),
(message) => handleMessage(message, channel),
options,
);
},
Expand Down

0 comments on commit ab7026f

Please sign in to comment.