Skip to content

Commit

Permalink
feat(job): support debouncing (#2760)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 30, 2024
1 parent f6d29fc commit 603befe
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 29 deletions.
26 changes: 24 additions & 2 deletions lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
ARGV[9] priority
ARGV[10] LIFO
ARGV[11] token
ARGV[12] debounce key
ARGV[13] debounceId
ARGV[14] debounceTtl
]]
local jobId
local jobIdKey
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/debounceJob"
--- @include "includes/getTargetQueueList"

local jobCounter = rcall("INCR", KEYS[4])
Expand All @@ -56,10 +60,28 @@ else
end
end

local debounceKey = ARGV[12]

local opts = cmsgpack.unpack(ARGV[5])

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
local debouncedJobId = debounceJob(ARGV[1], ARGV[13], ARGV[14],
jobId, debounceKey, ARGV[11])
if debouncedJobId then
return debouncedJobId
end

local debounceId = ARGV[13]

local optionalValues = {}

if debounceId ~= "" then
table.insert(optionalValues, "deid")
table.insert(optionalValues, debounceId)
end

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp",
ARGV[6], "delay", ARGV[7], "priority", ARGV[9], unpack(optionalValues))

-- Check if job is delayed
local delayedTimestamp = tonumber(ARGV[8])
Expand Down
14 changes: 11 additions & 3 deletions lib/commands/cleanJobsInSet-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
KEYS[2] priority key
KEYS[3] rate limiter key
ARGV[1] jobId
ARGV[1] prefix key
ARGV[2] maxTimestamp
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
Expand All @@ -16,14 +16,17 @@ local setKey = KEYS[1]
local priorityKey = KEYS[2]
local rateLimiterKey = KEYS[3]

local jobKeyPrefix = ARGV[1]
local prefixKey = ARGV[1]
local maxTimestamp = ARGV[2]
local limitStr = ARGV[3]
local setName = ARGV[4]

local isList = false
local rcall = redis.call

-- Includes
--- @include "includes/removeDebounceKey"

if setName == "wait" or setName == "active" or setName == "paused" then
isList = true
end
Expand Down Expand Up @@ -75,7 +78,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
break
end

local jobKey = jobKeyPrefix .. jobId
local jobKey = prefixKey .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Find the right timestamp of the job to compare to maxTimestamp:
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
Expand All @@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
rcall("ZREM", setKey, jobId)
end
rcall("ZREM", priorityKey, jobId)

if setName ~= "completed" and setName ~= "failed" then
removeDebounceKey(prefixKey, jobKey)
end

rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")

Expand Down
20 changes: 20 additions & 0 deletions lib/commands/includes/debounceJob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--[[
Function to debounce a job.
]]

local function debounceJob(prefixKey, debounceId, ttl, jobId, debounceKey, token)
if debounceId ~= "" then
local debounceKeyExists
if ttl ~= "" then
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
else
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
end
if debounceKeyExists then
local currentDebounceJobId = rcall('GET', debounceKey)
rcall("PUBLISH", prefixKey .. "debounced@" .. token, currentDebounceJobId)

return currentDebounceJobId
end
end
end
12 changes: 12 additions & 0 deletions lib/commands/includes/removeDebounceKey.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

--[[
Function to remove debounce key.
]]

local function removeDebounceKey(prefixKey, jobKey)
local debounceId = rcall("HGET", jobKey, "deid")
if debounceId then
local debounceKey = prefixKey .. "de:" .. debounceId
rcall("DEL", debounceKey)
end
end
14 changes: 14 additions & 0 deletions lib/commands/includes/removeDebounceKeyIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
--[[
Function to remove debounce key if needed.
]]

local function removeDebounceKeyIfNeeded(prefixKey, debounceId)
if debounceId then
local debounceKey = prefixKey .. "de:" .. debounceId
local pttl = rcall("PTTL", debounceKey)

if pttl == 0 or pttl == -1 then
rcall("DEL", debounceKey)
end
end
end
6 changes: 4 additions & 2 deletions lib/commands/moveStalledJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ local rcall = redis.call
-- Includes
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeDebounceKeyIfNeeded"

local function removeJob(jobId, baseKey)
local jobKey = baseKey .. jobId
Expand Down Expand Up @@ -78,12 +79,13 @@ if(#stalling > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
if(stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local jobAttributes = rcall("HMGET", jobKey, "opts", "deid")
local opts = cjson.decode(jobAttributes[1])
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
"finishedOn", ARGV[3])
removeDebounceKeyIfNeeded(ARGV[2], jobAttributes[2])
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}')

if removeOnFailType == "number" then
Expand Down
4 changes: 4 additions & 0 deletions lib/commands/moveToFinished-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ end

-- Includes
--- @include "includes/removeLock"
--- @include "includes/removeDebounceKeyIfNeeded"

if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1])
Expand All @@ -96,6 +97,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists

if numRemovedElements < 1 then return -3 end

local debounceId = rcall("HGET", KEYS[3], "deid")
removeDebounceKeyIfNeeded(ARGV[9], debounceId)

-- Remove job?
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
Expand Down
17 changes: 11 additions & 6 deletions lib/commands/obliterate-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ local maxCount = tonumber(ARGV[1])
local baseKey = KEYS[2]

local rcall = redis.call

-- Includes
--- @include "includes/removeDebounceKey"

local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end
Expand All @@ -26,23 +30,24 @@ local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

local function removeJobs(parentKey, keys)
local function removeJobs(baseKey, keys)
for i, key in ipairs(keys) do
rcall("DEL", baseKey .. key)
rcall("DEL", baseKey .. key .. ':logs')
local jobKey = baseKey .. key
rcall("DEL", jobKey, jobKey .. ':logs')
removeDebounceKey(baseKey, jobKey)
end
maxCount = maxCount - #keys
end

local function removeListJobs(keyName, max)
local jobs = getListItems(keyName, max)
removeJobs(keyName, jobs)
removeJobs(baseKey, jobs)
rcall("LTRIM", keyName, #jobs, -1)
end

local function removeZSetJobs(keyName, max)
local jobs = getZSetItems(keyName, max)
removeJobs(keyName, jobs)
removeJobs(baseKey, jobs)
if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end
end

Expand All @@ -65,7 +70,7 @@ if (#activeJobs > 0) then
end

removeLockKeys(activeJobs)
removeJobs(activeKey, activeJobs)
removeJobs(baseKey, activeJobs)
rcall("LTRIM", activeKey, #activeJobs, -1)
if (maxCount <= 0) then return 1 end

Expand Down
28 changes: 18 additions & 10 deletions lib/commands/removeJob-10.lua → lib/commands/removeJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
In order to be able to remove a job, it must be unlocked.
Input:
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] jobId
KEYS[9] job logs
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] jobId key
KEYS[9] job logs
KEYS[10] rate limiter index table
KEYS[11] prefix key
ARGV[1] jobId
ARGV[2] lock token
Expand All @@ -24,8 +25,12 @@
-- TODO PUBLISH global event 'removed'

local rcall = redis.call

-- Includes
--- @include "includes/removeDebounceKey"

local lockKey = KEYS[8] .. ':lock'
local lock = redis.call("GET", lockKey)
local lock = rcall("GET", lockKey)
if not lock then -- or (lock == ARGV[2])) then
local jobId = ARGV[1]
rcall("LREM", KEYS[1], 0, jobId)
Expand All @@ -35,6 +40,9 @@ if not lock then -- or (lock == ARGV[2])) then
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)

removeDebounceKey(KEYS[11], KEYS[8])

rcall("DEL", KEYS[8])
rcall("DEL", KEYS[9])

Expand Down
9 changes: 8 additions & 1 deletion lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const Job = function(queue, name, data, opts) {
this.attemptsMade = 0;

this.toKey = _.bind(queue.toKey, queue);
this.debounceId = this.opts.debounce ? this.opts.debounce.id : undefined;
};

function setDefaultOpts(opts) {
Expand All @@ -82,7 +83,8 @@ function addJob(queue, client, job) {
return scripts.addJob(client, queue, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
priority: opts.priority,
debounce: opts.debounce
});
}

Expand Down Expand Up @@ -182,6 +184,7 @@ Job.prototype.toJSON = function() {
failedReason: this.failedReason,
stacktrace: this.stacktrace || null,
returnvalue: this.returnvalue || null,
debounceId: this.debounceId || null,
finishedOn: this.finishedOn || null,
processedOn: this.processedOn || null
};
Expand Down Expand Up @@ -641,6 +644,10 @@ Job.fromJSON = function(queue, json, jobId) {
job.returnvalue = getReturnValue(json.returnvalue);
}

if (json.deid) {
job.debounceId = json.deid;
}

return job;
};

Expand Down
23 changes: 20 additions & 3 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ const Queue = function Queue(name, url, opts) {
'limiter',
'drained',
'duplicated',
'progress'
'progress',
'de' // debounce key
],
key => {
keys[key] = this.toKey(key);
Expand Down Expand Up @@ -418,6 +419,7 @@ Queue.prototype._setupQueueEventListeners = function() {
const failedKey = this.keys.failed;
const drainedKey = this.keys.drained;
const duplicatedKey = this.keys.duplicated;
const debouncedKey = this.keys.de + 'bounced';

const pmessageHandler = (pattern, channel, message) => {
const keyAndToken = channel.split('@');
Expand Down Expand Up @@ -445,6 +447,12 @@ Queue.prototype._setupQueueEventListeners = function() {
}
utils.emitSafe(this, 'global:duplicated', message);
break;
case debouncedKey:
if (this.token === token) {
utils.emitSafe(this, 'debounced', message);
}
utils.emitSafe(this, 'global:debounced', message);
break;
}
};

Expand Down Expand Up @@ -513,7 +521,7 @@ Queue.prototype._setupQueueEventListeners = function() {
};

Queue.prototype._registerEvent = function(eventName) {
const internalEvents = ['waiting', 'delayed', 'duplicated'];
const internalEvents = ['waiting', 'delayed', 'duplicated', 'debounced'];

if (
eventName.startsWith('global:') ||
Expand All @@ -531,7 +539,7 @@ Queue.prototype._registerEvent = function(eventName) {
.isRedisReady(this.eclient)
.then(() => {
const channel = this.toKey(_eventName);
if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) {
if (['active', 'waiting', 'stalled', 'duplicated', 'debounced'].indexOf(_eventName) !== -1) {
return (this.registeredEvents[_eventName] = this.eclient.psubscribe(
channel + '*'
));
Expand Down Expand Up @@ -782,6 +790,15 @@ Queue.prototype.retryJobs = async function(opts = {}) {
} while (cursor);
};

/**
* Removes a debounce key.
*
* @param id - identifier
*/
Queue.prototype.removeDebounceKey = (id) => {
return this.client.del(`${this.keys.de}:${id}`);
}

/**
Adds an array of jobs to the queue.
@method add
Expand Down
Loading

0 comments on commit 603befe

Please sign in to comment.