Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scripts): throw error when moving non-active job to delayed #2740

Merged
merged 4 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions lib/commands/includes/removeLock.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local function removeLock(jobKey, stalledKey, token, jobId)
if token ~= "0" then
local lockKey = jobKey .. ':lock'
local lockToken = rcall("GET", lockKey)
if lockToken == token then
rcall("DEL", lockKey)
rcall("SREM", stalledKey, jobId)
else
if lockToken then
-- Lock exists but token does not match
return -6
else
-- Lock is missing completely
return -2
end
end
end
return 0
end
19 changes: 9 additions & 10 deletions lib/commands/moveToDelayed-4.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@
]]
local rcall = redis.call

-- Includes
--- @include "includes/removeLock"

if rcall("EXISTS", KEYS[3]) == 1 then
-- Check for job lock
if ARGV[3] ~= "0" then
local lockKey = KEYS[3] .. ':lock'
if rcall("GET", lockKey) == ARGV[3] then
rcall("DEL", lockKey)
rcall("SREM", KEYS[4], ARGV[2])
else
return -2
end
local errorCode = removeLock(KEYS[3], KEYS[4], ARGV[3], ARGV[2])
if errorCode < 0 then
return errorCode
end

local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[2])
if numRemovedElements < 1 then return -3 end

local score = tonumber(ARGV[1])
rcall("ZADD", KEYS[2], score, ARGV[2])
rcall("PUBLISH", KEYS[2], (score / 0x1000))
rcall("LREM", KEYS[1], 0, ARGV[2])

return 0
else
Expand Down
2 changes: 1 addition & 1 deletion lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ Job.prototype.moveToFailed = async function(err, ignoreLock) {
const results = await multi.exec();
const code = _.last(results)[1];
if (code < 0) {
throw scripts.finishedErrors(code, this.id, command);
throw scripts.finishedErrors(code, this.id, command, 'active');
}
};

Expand Down
8 changes: 6 additions & 2 deletions lib/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,20 +238,24 @@ const scripts = {
);
return job.queue.client.moveToFinished(args).then(result => {
if (result < 0) {
throw scripts.finishedErrors(result, job.id, 'finished');
throw scripts.finishedErrors(result, job.id, 'finished', 'active');
} else if (result) {
return raw2jobData(result);
}
return 0;
});
},

finishedErrors(code, jobId, command) {
finishedErrors(code, jobId, command, state) {
switch (code) {
case -1:
return new Error('Missing key for job ' + jobId + ' ' + command);
case -2:
return new Error('Missing lock for job ' + jobId + ' ' + command);
case -3:
return new Error(`Job ${jobId} is not in the ${state} state. ${command}`);
case -6:
return new Error(`Lock mismatch for job ${jobId}. Cmd ${command} from ${state}`);
}
},

Expand Down
11 changes: 9 additions & 2 deletions test/test_job.js
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,9 @@ describe('Job', () => {
.then(isFailed => {
expect(isFailed).to.be(false);
})
.then(() => {
return scripts.moveToActive(queue);
})
.then(() => {
return job.moveToFailed(new Error('test error'), true);
})
Expand Down Expand Up @@ -893,7 +896,9 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('completed');
return client.zrem(queue.toKey('completed'), job.id);
return client.zrem(queue.toKey('completed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
});
})
.then(() => {
return job.moveToDelayed(Date.now() + 10000, true);
Expand All @@ -907,7 +912,9 @@ describe('Job', () => {
})
.then(state => {
expect(state).to.be('delayed');
return client.zrem(queue.toKey('delayed'), job.id);
return client.zrem(queue.toKey('delayed'), job.id).then(()=>{
return client.lpush(queue.toKey('active'), job.id)
});
})
.then(() => {
return job.moveToFailed(new Error('test'), true);
Expand Down
Loading