Skip to content

Commit

Permalink
Optimizations and fixes for clearing clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon Grondin committed Feb 25, 2019
1 parent 582db61 commit 71debfb
Show file tree
Hide file tree
Showing 18 changed files with 311 additions and 119 deletions.
69 changes: 43 additions & 26 deletions es5.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions lib/Bottleneck.js
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ Bottleneck = function () {
Promise: Promise,
timeout: null,
heartbeatInterval: 5000,
clientTimeout: 10000,
clientOptions: {},
clusterNodes: null,
clearDatastore: false,
Expand Down
23 changes: 15 additions & 8 deletions lib/RedisDatastore.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ RedisDatastore = class RedisDatastore {
}).then(() => {
return this.connection.__addLimiter__(this.instance);
}).then(() => {
return this.runScript("heartbeat", []);
return this.runScript("register_client", [this.instance.queued()]);
}).then(() => {
var base;

Expand Down Expand Up @@ -152,13 +152,13 @@ RedisDatastore = class RedisDatastore {
var _this3 = this;

return _asyncToGenerator(function* () {
if (!(name === "init" || name === "heartbeat")) {
if (!(name === "init" || name === "register_client")) {
yield _this3.ready;
}

return new _this3.Promise((resolve, reject) => {
var all_args, arr;
all_args = [Date.now(), _this3.clientId, _this3.instance.queued()].concat(args);
all_args = [Date.now(), _this3.clientId].concat(args);

_this3.instance.Events.trigger("debug", `Calling Redis script: ${name}.lua`, all_args);

Expand All @@ -171,12 +171,18 @@ RedisDatastore = class RedisDatastore {
});
return _this3.connection.__scriptFn__(name)(...arr);
}).catch(e => {
if (e.message === "SETTINGS_KEY_NOT_FOUND" && name !== "heartbeat") {
return _this3.runScript("init", _this3.prepareInitSettings(false)).then(() => {
if (e.message === "SETTINGS_KEY_NOT_FOUND") {
if (name === "heartbeat") {
return _this3.Promise.resolve();
} else {
return _this3.runScript("init", _this3.prepareInitSettings(false)).then(() => {
return _this3.runScript(name, args);
});
}
} else if (e.message === "UNKNOWN_CLIENT") {
return _this3.runScript("register_client", [_this3.instance.queued()]).then(() => {
return _this3.runScript(name, args);
});
} else if (name === "heartbeat") {
return _this3.Promise.resolve();
} else {
return _this3.Promise.reject(e);
}
Expand Down Expand Up @@ -213,7 +219,8 @@ RedisDatastore = class RedisDatastore {
args = this.prepareObject(Object.assign({}, this.storeOptions, {
id: this.originalId,
version: this.instance.version,
groupTimeout: this.timeout
groupTimeout: this.timeout,
clientTimeout: this.clientTimeout
}));
args.unshift(clear ? 1 : 0, this.instance.version);
return args;
Expand Down
31 changes: 19 additions & 12 deletions lib/Scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ lua = require("./lua.json");
headers = {
refs: lua["refs.lua"],
validate_keys: lua["validate_keys.lua"],
validate_client: lua["validate_client.lua"],
refresh_expiration: lua["refresh_expiration.lua"],
process_tick: lua["process_tick.lua"],
conditions_check: lua["conditions_check.lua"],
Expand Down Expand Up @@ -67,75 +68,81 @@ templates = {
refresh_expiration: false,
code: lua["group_check.lua"]
},
blacklist_client: {
register_client: {
keys: exports.allKeys,
headers: ["validate_keys"],
refresh_expiration: false,
code: lua["register_client.lua"]
},
blacklist_client: {
keys: exports.allKeys,
headers: ["validate_keys", "validate_client"],
refresh_expiration: false,
code: lua["blacklist_client.lua"]
},
heartbeat: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: false,
code: lua["heartbeat.lua"]
},
update_settings: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: true,
code: lua["update_settings.lua"]
},
running: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: false,
code: lua["running.lua"]
},
queued: {
keys: exports.allKeys,
headers: ["validate_keys"],
headers: ["validate_keys", "validate_client"],
refresh_expiration: false,
code: lua["queued.lua"]
},
done: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: false,
code: lua["done.lua"]
},
check: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick", "conditions_check"],
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"],
refresh_expiration: false,
code: lua["check.lua"]
},
submit: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick", "conditions_check"],
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"],
refresh_expiration: true,
code: lua["submit.lua"]
},
register: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick", "conditions_check"],
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"],
refresh_expiration: true,
code: lua["register.lua"]
},
free: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: true,
code: lua["free.lua"]
},
current_reservoir: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: false,
code: lua["current_reservoir.lua"]
},
increment_reservoir: {
keys: exports.allKeys,
headers: ["validate_keys", "process_tick"],
headers: ["validate_keys", "validate_client", "process_tick"],
refresh_expiration: true,
code: lua["increment_reservoir.lua"]
}
Expand Down
14 changes: 8 additions & 6 deletions lib/lua.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions light.js
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,7 @@
Promise: Promise,
timeout: null,
heartbeatInterval: 5000,
clientTimeout: 10000,
clientOptions: {},
clusterNodes: null,
clearDatastore: false,
Expand Down
1 change: 1 addition & 0 deletions src/Bottleneck.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Bottleneck
Promise: Promise
timeout: null
heartbeatInterval: 5000
clientTimeout: 10000
clientOptions: {}
clusterNodes: null
clearDatastore: false
Expand Down
21 changes: 13 additions & 8 deletions src/RedisDatastore.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class RedisDatastore
@ready = @connection.ready
.then (@clients) => @runScript "init", @prepareInitSettings @clearDatastore
.then => @connection.__addLimiter__ @instance
.then => @runScript "heartbeat", []
.then => @runScript "register_client", [@instance.queued()]
.then =>
(@heartbeat = setInterval =>
@runScript "heartbeat", []
Expand Down Expand Up @@ -72,19 +72,23 @@ class RedisDatastore
@connection.disconnect flush

runScript: (name, args) ->
await @ready unless name == "init" or name == "heartbeat"
await @ready unless name == "init" or name == "register_client"
new @Promise (resolve, reject) =>
all_args = [Date.now(), @clientId, @instance.queued()].concat args
all_args = [Date.now(), @clientId].concat args
@instance.Events.trigger "debug", "Calling Redis script: #{name}.lua", all_args
arr = @connection.__scriptArgs__ name, @originalId, all_args, (err, replies) ->
if err? then return reject err
return resolve replies
@connection.__scriptFn__(name) arr...
.catch (e) =>
if e.message == "SETTINGS_KEY_NOT_FOUND" and name != "heartbeat"
@runScript("init", @prepareInitSettings(false))
if e.message == "SETTINGS_KEY_NOT_FOUND"
if name == "heartbeat" then @Promise.resolve()
else
@runScript("init", @prepareInitSettings(false))
.then => @runScript(name, args)
else if e.message == "UNKNOWN_CLIENT"
@runScript("register_client", [@instance.queued()])
.then => @runScript(name, args)
else if name == "heartbeat" then @Promise.resolve()
else @Promise.reject e

prepareArray: (arr) -> (if x? then x.toString() else "") for x in arr
Expand All @@ -96,9 +100,10 @@ class RedisDatastore

prepareInitSettings: (clear) ->
args = @prepareObject Object.assign({}, @storeOptions, {
id: @originalId,
version: @instance.version,
id: @originalId
version: @instance.version
groupTimeout: @timeout
@clientTimeout
})
args.unshift (if clear then 1 else 0), @instance.version
args
Expand Down
30 changes: 18 additions & 12 deletions src/Scripts.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ lua = require "./lua.json"
headers =
refs: lua["refs.lua"]
validate_keys: lua["validate_keys.lua"]
validate_client: lua["validate_client.lua"]
refresh_expiration: lua["refresh_expiration.lua"]
process_tick: lua["process_tick.lua"]
conditions_check: lua["conditions_check.lua"]
Expand Down Expand Up @@ -68,64 +69,69 @@ templates =
headers: []
refresh_expiration: false
code: lua["group_check.lua"]
blacklist_client:
register_client:
keys: exports.allKeys
headers: ["validate_keys"]
refresh_expiration: false
code: lua["register_client.lua"]
blacklist_client:
keys: exports.allKeys
headers: ["validate_keys", "validate_client"]
refresh_expiration: false
code: lua["blacklist_client.lua"]
heartbeat:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["heartbeat.lua"]
update_settings:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: true
code: lua["update_settings.lua"]
running:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["running.lua"]
queued:
keys: exports.allKeys
headers: ["validate_keys"]
headers: ["validate_keys", "validate_client"]
refresh_expiration: false
code: lua["queued.lua"]
done:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["done.lua"]
check:
keys: exports.allKeys
headers: ["validate_keys", "process_tick", "conditions_check"]
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"]
refresh_expiration: false
code: lua["check.lua"]
submit:
keys: exports.allKeys
headers: ["validate_keys", "process_tick", "conditions_check"]
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"]
refresh_expiration: true
code: lua["submit.lua"]
register:
keys: exports.allKeys
headers: ["validate_keys", "process_tick", "conditions_check"]
headers: ["validate_keys", "validate_client", "process_tick", "conditions_check"]
refresh_expiration: true
code: lua["register.lua"]
free:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: true
code: lua["free.lua"]
current_reservoir:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: false
code: lua["current_reservoir.lua"]
increment_reservoir:
keys: exports.allKeys
headers: ["validate_keys", "process_tick"]
headers: ["validate_keys", "validate_client", "process_tick"]
refresh_expiration: true
code: lua["increment_reservoir.lua"]

Expand Down
5 changes: 4 additions & 1 deletion src/redis/blacklist_client.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
local blacklist = ARGV[num_static_argv + 1]

redis.call('zadd', client_last_seen_key, 0, blacklist)
if redis.call('zscore', client_last_seen_key, blacklist) then
redis.call('zadd', client_last_seen_key, 0, blacklist)
end


return {}
15 changes: 8 additions & 7 deletions src/redis/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ if redis.call('exists', settings_key) == 0 then
'capacityPriorityCounter', 0
)

if clear == 1 then
-- Force all connected clients to re-register
process_tick(now, true)
end

else
-- Apply migrations
local settings = redis.call('hmget', settings_key,
Expand All @@ -54,7 +49,7 @@ else
end

-- 2.11.1
if version_digits[2] < 11 and version_digits[3] < 1 then
if version_digits[2] < 11 or (version_digits[2] == 11 and version_digits[3] < 1) then
if redis.call('hstrlen', settings_key, 'lastReservoirRefresh') == 0 then
redis.call('hmset', settings_key,
'lastReservoirRefresh', now,
Expand All @@ -78,11 +73,17 @@ else
end

-- 2.15.2
if version_digits[2] < 15 and version_digits[3] < 2 then
if version_digits[2] < 15 or (version_digits[2] == 15 and version_digits[3] < 2) then
redis.call('hsetnx', settings_key, 'capacityPriorityCounter', 0)
redis.call('hset', settings_key, 'version', '2.15.2')
end

-- 2.17.0
if version_digits[2] < 17 then
redis.call('hsetnx', settings_key, 'clientTimeout', 10000)
redis.call('hset', settings_key, 'version', '2.17.0')
end

end

process_tick(now, false)
Expand Down
Loading

0 comments on commit 71debfb

Please sign in to comment.