Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(job): support debouncing #2760

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions lib/commands/addJob-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,17 @@
ARGV[9] priority
ARGV[10] LIFO
ARGV[11] token
ARGV[12] debounce key
ARGV[13] debounceId
ARGV[14] debounceTtl
]]
local jobId
local jobIdKey
local rcall = redis.call

-- Includes
--- @include "includes/addJobWithPriority"
--- @include "includes/debounceJob"
--- @include "includes/getTargetQueueList"

local jobCounter = rcall("INCR", KEYS[4])
Expand All @@ -56,10 +60,28 @@ else
end
end

local debounceKey = ARGV[12]

local opts = cmsgpack.unpack(ARGV[5])

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
local debouncedJobId = debounceJob(ARGV[1], ARGV[13], ARGV[14],
jobId, debounceKey, ARGV[11])
if debouncedJobId then
return debouncedJobId
end

local debounceId = ARGV[13]

local optionalValues = {}

if debounceId ~= "" then
table.insert(optionalValues, "deid")
table.insert(optionalValues, debounceId)
end

-- Store the job.
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp",
ARGV[6], "delay", ARGV[7], "priority", ARGV[9], unpack(optionalValues))

-- Check if job is delayed
local delayedTimestamp = tonumber(ARGV[8])
Expand Down
14 changes: 11 additions & 3 deletions lib/commands/cleanJobsInSet-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
KEYS[2] priority key
KEYS[3] rate limiter key

ARGV[1] jobId
ARGV[1] prefix key
ARGV[2] maxTimestamp
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
Expand All @@ -16,14 +16,17 @@ local setKey = KEYS[1]
local priorityKey = KEYS[2]
local rateLimiterKey = KEYS[3]

local jobKeyPrefix = ARGV[1]
local prefixKey = ARGV[1]
local maxTimestamp = ARGV[2]
local limitStr = ARGV[3]
local setName = ARGV[4]

local isList = false
local rcall = redis.call

-- Includes
--- @include "includes/removeDebounceKey"

if setName == "wait" or setName == "active" or setName == "paused" then
isList = true
end
Expand Down Expand Up @@ -75,7 +78,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
break
end

local jobKey = jobKeyPrefix .. jobId
local jobKey = prefixKey .. jobId
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
-- Find the right timestamp of the job to compare to maxTimestamp:
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
Expand All @@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
rcall("ZREM", setKey, jobId)
end
rcall("ZREM", priorityKey, jobId)

if setName ~= "completed" and setName ~= "failed" then
removeDebounceKey(prefixKey, jobKey)
end

rcall("DEL", jobKey)
rcall("DEL", jobKey .. ":logs")

Expand Down
20 changes: 20 additions & 0 deletions lib/commands/includes/debounceJob.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
--[[
Function to debounce a job.
]]

local function debounceJob(prefixKey, debounceId, ttl, jobId, debounceKey, token)
if debounceId ~= "" then
local debounceKeyExists
if ttl ~= "" then
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
else
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
end
if debounceKeyExists then
local currentDebounceJobId = rcall('GET', debounceKey)
rcall("PUBLISH", prefixKey .. "debounced@" .. token, currentDebounceJobId)

return currentDebounceJobId
end
end
end
12 changes: 12 additions & 0 deletions lib/commands/includes/removeDebounceKey.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@

--[[
Function to remove debounce key.
]]

local function removeDebounceKey(prefixKey, jobKey)
local debounceId = rcall("HGET", jobKey, "deid")
if debounceId then
local debounceKey = prefixKey .. "de:" .. debounceId
rcall("DEL", debounceKey)
end
end
14 changes: 14 additions & 0 deletions lib/commands/includes/removeDebounceKeyIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
--[[
Function to remove debounce key if needed.
]]

local function removeDebounceKeyIfNeeded(prefixKey, debounceId)
if debounceId then
local debounceKey = prefixKey .. "de:" .. debounceId
local pttl = rcall("PTTL", debounceKey)

if pttl == 0 or pttl == -1 then
rcall("DEL", debounceKey)
end
end
end
6 changes: 4 additions & 2 deletions lib/commands/moveStalledJobsToWait-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ local rcall = redis.call
-- Includes
--- @include "includes/batches"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeDebounceKeyIfNeeded"

local function removeJob(jobId, baseKey)
local jobKey = baseKey .. jobId
Expand Down Expand Up @@ -78,12 +79,13 @@ if(#stalling > 0) then
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
if(stalledCount > MAX_STALLED_JOB_COUNT) then
local rawOpts = rcall("HGET", jobKey, "opts")
local opts = cjson.decode(rawOpts)
local jobAttributes = rcall("HMGET", jobKey, "opts", "deid")
local opts = cjson.decode(jobAttributes[1])
local removeOnFailType = type(opts["removeOnFail"])
rcall("ZADD", KEYS[4], ARGV[3], jobId)
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
"finishedOn", ARGV[3])
removeDebounceKeyIfNeeded(ARGV[2], jobAttributes[2])
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}')

if removeOnFailType == "number" then
Expand Down
4 changes: 4 additions & 0 deletions lib/commands/moveToFinished-9.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ end

-- Includes
--- @include "includes/removeLock"
--- @include "includes/removeDebounceKeyIfNeeded"

if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1])
Expand All @@ -96,6 +97,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists

if numRemovedElements < 1 then return -3 end

local debounceId = rcall("HGET", KEYS[3], "deid")
removeDebounceKeyIfNeeded(ARGV[9], debounceId)

-- Remove job?
local keepJobs = cmsgpack.unpack(ARGV[6])
local maxCount = keepJobs['count']
Expand Down
17 changes: 11 additions & 6 deletions lib/commands/obliterate-2.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ local maxCount = tonumber(ARGV[1])
local baseKey = KEYS[2]

local rcall = redis.call

-- Includes
--- @include "includes/removeDebounceKey"

local function getListItems(keyName, max)
return rcall('LRANGE', keyName, 0, max - 1)
end
Expand All @@ -26,23 +30,24 @@ local function getZSetItems(keyName, max)
return rcall('ZRANGE', keyName, 0, max - 1)
end

local function removeJobs(parentKey, keys)
Copy link
Collaborator Author

@roggervalf roggervalf Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parentKey is wrong, this is an issue in this script that is being fixed in this pr

local function removeJobs(baseKey, keys)
for i, key in ipairs(keys) do
rcall("DEL", baseKey .. key)
rcall("DEL", baseKey .. key .. ':logs')
local jobKey = baseKey .. key
rcall("DEL", jobKey, jobKey .. ':logs')
removeDebounceKey(baseKey, jobKey)
end
maxCount = maxCount - #keys
end

local function removeListJobs(keyName, max)
local jobs = getListItems(keyName, max)
removeJobs(keyName, jobs)
removeJobs(baseKey, jobs)
rcall("LTRIM", keyName, #jobs, -1)
end

local function removeZSetJobs(keyName, max)
local jobs = getZSetItems(keyName, max)
removeJobs(keyName, jobs)
removeJobs(baseKey, jobs)
if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end
end

Expand All @@ -65,7 +70,7 @@ if (#activeJobs > 0) then
end

removeLockKeys(activeJobs)
removeJobs(activeKey, activeJobs)
removeJobs(baseKey, activeJobs)
rcall("LTRIM", activeKey, #activeJobs, -1)
if (maxCount <= 0) then return 1 end

Expand Down
28 changes: 18 additions & 10 deletions lib/commands/removeJob-10.lua → lib/commands/removeJob-11.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
In order to be able to remove a job, it must be unlocked.

Input:
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] jobId
KEYS[9] job logs
KEYS[1] 'active',
KEYS[2] 'wait',
KEYS[3] 'delayed',
KEYS[4] 'paused',
KEYS[5] 'completed',
KEYS[6] 'failed',
KEYS[7] 'priority',
KEYS[8] jobId key
KEYS[9] job logs
KEYS[10] rate limiter index table
KEYS[11] prefix key

ARGV[1] jobId
ARGV[2] lock token
Expand All @@ -24,8 +25,12 @@
-- TODO PUBLISH global event 'removed'

local rcall = redis.call

-- Includes
--- @include "includes/removeDebounceKey"

local lockKey = KEYS[8] .. ':lock'
local lock = redis.call("GET", lockKey)
local lock = rcall("GET", lockKey)
if not lock then -- or (lock == ARGV[2])) then
local jobId = ARGV[1]
rcall("LREM", KEYS[1], 0, jobId)
Expand All @@ -35,6 +40,9 @@ if not lock then -- or (lock == ARGV[2])) then
rcall("ZREM", KEYS[5], jobId)
rcall("ZREM", KEYS[6], jobId)
rcall("ZREM", KEYS[7], jobId)

removeDebounceKey(KEYS[11], KEYS[8])

rcall("DEL", KEYS[8])
rcall("DEL", KEYS[9])

Expand Down
9 changes: 8 additions & 1 deletion lib/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const Job = function(queue, name, data, opts) {
this.attemptsMade = 0;

this.toKey = _.bind(queue.toKey, queue);
this.debounceId = this.opts.debounce ? this.opts.debounce.id : undefined;
};

function setDefaultOpts(opts) {
Expand All @@ -82,7 +83,8 @@ function addJob(queue, client, job) {
return scripts.addJob(client, queue, jobData, {
lifo: opts.lifo,
customJobId: opts.jobId,
priority: opts.priority
priority: opts.priority,
debounce: opts.debounce
});
}

Expand Down Expand Up @@ -182,6 +184,7 @@ Job.prototype.toJSON = function() {
failedReason: this.failedReason,
stacktrace: this.stacktrace || null,
returnvalue: this.returnvalue || null,
debounceId: this.debounceId || null,
finishedOn: this.finishedOn || null,
processedOn: this.processedOn || null
};
Expand Down Expand Up @@ -641,6 +644,10 @@ Job.fromJSON = function(queue, json, jobId) {
job.returnvalue = getReturnValue(json.returnvalue);
}

if (json.deid) {
job.debounceId = json.deid;
}

return job;
};

Expand Down
23 changes: 20 additions & 3 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ const Queue = function Queue(name, url, opts) {
'limiter',
'drained',
'duplicated',
'progress'
'progress',
'de' // debounce key
],
key => {
keys[key] = this.toKey(key);
Expand Down Expand Up @@ -418,6 +419,7 @@ Queue.prototype._setupQueueEventListeners = function() {
const failedKey = this.keys.failed;
const drainedKey = this.keys.drained;
const duplicatedKey = this.keys.duplicated;
const debouncedKey = this.keys.de + 'bounced';

const pmessageHandler = (pattern, channel, message) => {
const keyAndToken = channel.split('@');
Expand Down Expand Up @@ -445,6 +447,12 @@ Queue.prototype._setupQueueEventListeners = function() {
}
utils.emitSafe(this, 'global:duplicated', message);
break;
case debouncedKey:
if (this.token === token) {
utils.emitSafe(this, 'debounced', message);
}
utils.emitSafe(this, 'global:debounced', message);
break;
}
};

Expand Down Expand Up @@ -513,7 +521,7 @@ Queue.prototype._setupQueueEventListeners = function() {
};

Queue.prototype._registerEvent = function(eventName) {
const internalEvents = ['waiting', 'delayed', 'duplicated'];
const internalEvents = ['waiting', 'delayed', 'duplicated', 'debounced'];

if (
eventName.startsWith('global:') ||
Expand All @@ -531,7 +539,7 @@ Queue.prototype._registerEvent = function(eventName) {
.isRedisReady(this.eclient)
.then(() => {
const channel = this.toKey(_eventName);
if (['active', 'waiting', 'stalled', 'duplicated'].indexOf(_eventName) !== -1) {
if (['active', 'waiting', 'stalled', 'duplicated', 'debounced'].indexOf(_eventName) !== -1) {
return (this.registeredEvents[_eventName] = this.eclient.psubscribe(
channel + '*'
));
Expand Down Expand Up @@ -782,6 +790,15 @@ Queue.prototype.retryJobs = async function(opts = {}) {
} while (cursor);
};

/**
* Removes a debounce key.
*
* @param id - identifier
*/
Queue.prototype.removeDebounceKey = (id) => {
return this.client.del(`${this.keys.de}:${id}`);
}

/**
Adds an array of jobs to the queue.
@method add
Expand Down
Loading
Loading