Skip to content

Commit 9cea60e

Browse files
committed
Fix race condition causing many connections to RabbitMQ
1 parent cc73263 commit 9cea60e

File tree

1 file changed

+40
-17
lines changed

1 file changed

+40
-17
lines changed

functions/kubernetes/amqpConnector.js

100644100755
Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,42 @@
1+
12
const amqplib = require('amqplib'),
23
createJobMessage = require('../../common/jobMessage').createJobMessage;
3-
let channels = {};
4+
45
let conn = null;
6+
let connPromise = null;
7+
let channels = {};
8+
let channelPromises = {};
9+
10+
async function getConnection() {
11+
if (conn) return conn;
12+
if (!connPromise) {
13+
console.log("[AMQP] Creating new connection...");
14+
connPromise = amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
15+
}
16+
conn = await connPromise;
17+
return conn;
18+
}
519

620
async function initialize(queue_name) {
21+
const connection = await getConnection();
22+
23+
if (channels[queue_name]) return;
724

8-
if (conn === null) {
9-
conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
25+
if (!channelPromises[queue_name]) {
26+
channelPromises[queue_name] = (async () => {
27+
try {
28+
console.log(`[AMQP] Creating channel for queue ${queue_name}`);
29+
const ch = await connection.createChannel();
30+
await ch.assertQueue(queue_name, { durable: false, expires: 6000000 });
31+
channels[queue_name] = ch;
32+
} catch (err) {
33+
delete channelPromises[queue_name]; // retry logic
34+
throw err;
35+
}
36+
})();
1037
}
11-
let ch = await conn.createChannel()
12-
await ch.assertQueue(queue_name, {durable: false, expires: 6000000});
13-
channels[queue_name] = ch
1438

39+
await channelPromises[queue_name];
1540
}
1641

1742
function getQueueName(context) {
@@ -30,28 +55,26 @@ function getQueueName(context) {
3055

3156
async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) {
3257
let context = contextArr[0];
33-
let queue_name = getQueueName(context)
34-
if (conn === null || !(queue_name in channels)) {
35-
await initialize(queue_name)
36-
}
37-
let ch = channels[queue_name]
58+
let queue_name = getQueueName(context);
3859
try {
60+
await initialize(queue_name);
61+
let ch = channels[queue_name];
3962

40-
console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`)
63+
console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`);
4164
let tasks = [];
4265

4366
for (let i = 0; i < jobArr.length; i++) {
4467
let job = jobArr[i];
4568
let taskId = taskIdArr[i];
4669
let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId);
47-
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove
48-
tasks.push({"id": taskId, "message": jobMessage});
70+
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId); // TODO remove
71+
tasks.push({ "id": taskId, "message": jobMessage });
4972
}
5073

51-
await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks})));
74+
ch.sendToQueue(queue_name, Buffer.from(JSON.stringify({ 'tasks': tasks })));
5275
} catch (error) {
53-
console.log(error)
76+
console.log(error);
5477
}
5578
}
5679

57-
exports.enqueueJobs = enqueueJobs
80+
exports.enqueueJobs = enqueueJobs

0 commit comments

Comments
 (0)