Skip to content

Commit 7fc567b

Browse files
authored
Merge pull request #78 from hyperflow-wms/amqp-k8s-command
Amqp k8s command
2 parents 17801df + 6802960 commit 7fc567b

File tree

4 files changed

+126
-41
lines changed

4 files changed

+126
-41
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
const amqplib = require('amqplib'),
2+
createJobMessage = require('../../common/jobMessage').createJobMessage;
3+
let channels = {};
4+
let conn = null;
5+
6+
async function initialize(queue_name) {
7+
8+
if (conn === null) {
9+
conn = await amqplib.connect(`amqp://${process.env.RABBIT_HOSTNAME}`, "heartbeat=60");
10+
}
11+
let ch = await conn.createChannel()
12+
await ch.assertQueue(queue_name, {durable: false, expires: 6000000});
13+
channels[queue_name] = ch
14+
15+
}
16+
17+
function getQueueName(context) {
18+
if ("executionModels" in context.appConfig) {
19+
for (const taskType of context.appConfig.executionModels) {
20+
if (taskType.name === context['name']) {
21+
if ("queue" in taskType) {
22+
return taskType.queue;
23+
}
24+
}
25+
}
26+
}
27+
let namespace = process.env.HF_VAR_NAMESPACE || 'default'
28+
return namespace + "." + context['name']
29+
}
30+
31+
async function enqueueJobs(jobArr, taskIdArr, contextArr, customParams) {
32+
let context = contextArr[0];
33+
let queue_name = getQueueName(context)
34+
if (conn === null || !(queue_name in channels)) {
35+
await initialize(queue_name)
36+
}
37+
let ch = channels[queue_name]
38+
try {
39+
40+
console.log(`jobArr: ${JSON.stringify(jobArr)}, taskIdArr: ${JSON.stringify(taskIdArr)}, contextArr: ${JSON.stringify(contextArr)}, customParams: ${JSON.stringify(customParams)}`)
41+
let tasks = [];
42+
43+
for (let i = 0; i < jobArr.length; i++) {
44+
let job = jobArr[i];
45+
let taskId = taskIdArr[i];
46+
let jobMessage = createJobMessage(job.ins, job.outs, contextArr[i], taskId);
47+
await context.sendMsgToJob(JSON.stringify(jobMessage), taskId) // TODO remove
48+
tasks.push({"id": taskId, "message": jobMessage});
49+
}
50+
51+
await ch.publish('', queue_name, Buffer.from(JSON.stringify({'tasks': tasks})));
52+
} catch (error) {
53+
console.log(error)
54+
}
55+
}
56+
57+
exports.enqueueJobs = enqueueJobs
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
2+
async function synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn) {
3+
4+
let context = contextArr[0];
5+
// 'awaitJob' -- wait for the job to finish, possibly restarting it
6+
// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
7+
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
8+
var restartPolicy = backoffLimit > 0 ? "OnFailure" : "Never";
9+
var restartCount = 0;
10+
var awaitJob = async (taskId) => {
11+
try {
12+
var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite
13+
} catch (err) {
14+
console.error(err);
15+
throw err;
16+
}
17+
let taskEnd = new Date().toISOString();
18+
console.log('Job ended with result:', jobResult, 'time:', taskEnd);
19+
// job exit code
20+
return parseInt(jobResult[1]);
21+
}
22+
23+
var awaitJobs = async (taskIdArr) => {
24+
let awaitPromises = []
25+
for (var i = 0; i < taskIdArr.length; i++) {
26+
awaitPromises.push(awaitJob(taskIdArr[i]));
27+
}
28+
return Promise.all(awaitPromises);
29+
}
30+
31+
let jobExitCodes = await awaitJobs(taskIdArr);
32+
for (let i = 0; i < jobExitCodes.length; i++) {
33+
let jobExitCode = jobExitCodes[i];
34+
let taskId = taskIdArr[i];
35+
if (jobExitCode !== 0) {
36+
console.log("Job", taskId, "failed");
37+
restartFn(i);
38+
// NOTE: job message is preserved, so we don't have to send it again.
39+
}
40+
}
41+
42+
return jobExitCodes;
43+
44+
}
45+
46+
exports.synchronizeJobs = synchronizeJobs

functions/kubernetes/k8sCommand.js

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ const k8s = require('@kubernetes/client-node');
44
var BufferManager = require('./buffer_manager.js').BufferManager;
55
var RestartCounter = require('./restart_counter.js').RestartCounter;
66
var submitK8sJob = require('./k8sJobSubmit.js').submitK8sJob;
7+
var amqpEnqueueJobs = require('./amqpConnector.js').enqueueJobs;
8+
var synchronizeJobs = require('./jobSynchronization').synchronizeJobs
79
var fs = require('fs');
810

911
let bufferManager = new BufferManager();
@@ -19,6 +21,18 @@ let restartCounter = new RestartCounter(backoffLimit);
1921
// * outs
2022
// * context
2123
// * cb
24+
25+
function getExecutorType(context) {
26+
if ("executionModels" in context.appConfig) {
27+
for (const taskType of context.appConfig.executionModels) {
28+
if (taskType.name === context['name']) {
29+
return "WORKER_POOL"
30+
}
31+
}
32+
}
33+
return "JOB"
34+
}
35+
2236
async function k8sCommandGroup(bufferItems) {
2337

2438
// No action needed when buffer is empty
@@ -112,7 +126,12 @@ async function k8sCommandGroup(bufferItems) {
112126

113127
let jobExitCodes = [];
114128
try {
115-
jobExitCodes = await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn);
129+
if (getExecutorType(context) === "WORKER_POOL") {
130+
await amqpEnqueueJobs(jobArr, taskIdArr, contextArr, customParams)
131+
} else {
132+
await submitK8sJob(kubeconfig, jobArr, taskIdArr, contextArr, customParams)
133+
}
134+
jobExitCodes = await synchronizeJobs(jobArr, taskIdArr, contextArr, customParams, restartFn);
116135
} catch (err) {
117136
console.log("Error when submitting job:", err);
118137
throw err;

functions/kubernetes/k8sJobSubmit.js

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
5858
var volumePath = '/work_dir';
5959
var jobName = Math.random().toString(36).substring(7) + '-' +
6060
job.name.replace(/_/g, '-') + "-" + context.procId + '-' + context.firingId;
61+
var workingDirPath = context.workdir;
6162

6263
// remove chars not allowd in Pod names
6364
jobName = jobName.replace(/[^0-9a-z-]/gi, '').toLowerCase();
@@ -80,7 +81,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
8081
restartPolicy: restartPolicy, backoffLimit: backoffLimit,
8182
experimentId: context.hfId + ":" + context.appId,
8283
workflowName: context.wfname, taskName: job.name,
83-
appId: context.appId
84+
appId: context.appId, workingDirPath: workingDirPath
8485
}
8586

8687
// Add/override custom parameters for the job
@@ -104,7 +105,7 @@ var createK8sJobYaml = (job, taskIds, context, jobYamlTemplate, customParams) =>
104105
//
105106
//
106107
// Returns: job exit code
107-
var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams, restartFn) => {
108+
var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams) => {
108109

109110
// Load definition of the the worker job pod
110111
// File 'job-template.yaml' should be provided externally during deployment
@@ -193,44 +194,6 @@ var submitK8sJob = async(kubeconfig, jobArr, taskIdArr, contextArr, customParams
193194
throw err;
194195
}
195196

196-
// 'awaitJob' -- wait for the job to finish, possibly restarting it
197-
// Restart policy -- enable if "HF_VAR_BACKOFF_LIMIT" (number of retries) is defined
198-
var backoffLimit = process.env.HF_VAR_BACKOFF_LIMIT || 0;
199-
var restartPolicy = backoffLimit > 0 ? "OnFailure": "Never";
200-
var restartCount = 0;
201-
var awaitJob = async(taskId) => {
202-
try {
203-
var jobResult = await context.jobResult(0, taskId); // timeout=0 means indefinite
204-
} catch (err) {
205-
console.error(err);
206-
throw err;
207-
}
208-
let taskEnd = new Date().toISOString();
209-
console.log('Job ended with result:', jobResult, 'time:', taskEnd);
210-
var code = parseInt(jobResult[1]); // job exit code
211-
return code;
212-
}
213-
214-
var awaitJobs = async(taskIdArr) => {
215-
let awaitPromises = []
216-
for (var i=0; i<taskIdArr.length; i++) {
217-
awaitPromises.push(awaitJob(taskIdArr[i]));
218-
}
219-
return Promise.all(awaitPromises);
220-
}
221-
222-
let jobExitCodes = await awaitJobs(taskIdArr);
223-
for (let i = 0; i < jobExitCodes.length; i++) {
224-
let jobExitCode = jobExitCodes[i];
225-
let taskId = taskIdArr[i];
226-
if (jobExitCode != 0) {
227-
console.log("Job", taskId, "failed");
228-
restartFn(i);
229-
// NOTE: job message is preserved, so we don't have to send it again.
230-
}
231-
}
232-
233-
return jobExitCodes;
234197
}
235198

236199
exports.submitK8sJob = submitK8sJob;

0 commit comments

Comments
 (0)