lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-2.0.2 vs lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-2.1.0
- old
+ new
@@ -46,38 +46,53 @@
local namespace = table.remove(ARGV, 1)..'limit_fetch:'
local worker_name = table.remove(ARGV, 1)
local queues = ARGV
local available = {}
local unblocked = {}
- local queue_locks
+ local locks
+ local process_locks
local blocking_mode
for _, queue in ipairs(queues) do
if not blocking_mode or unblocked[queue] then
local probed_key = namespace..'probed:'..queue
local pause_key = namespace..'pause:'..queue
local paused = redis.call('get', pause_key)
if not paused then
- local limit_key = namespace..'limit:'..queue
- local queue_limit = tonumber(redis.call('get', limit_key))
+ local limit_key = namespace..'limit:'..queue
+ local limit = tonumber(redis.call('get', limit_key))
+ local process_limit_key = namespace..'process_limit:'..queue
+ local process_limit = tonumber(redis.call('get', process_limit_key))
+
local block_key = namespace..'block:'..queue
local can_block = redis.call('get', block_key)
- if can_block or queue_limit then
- queue_locks = redis.call('llen', probed_key)
+ if can_block or limit then
+ locks = redis.call('llen', probed_key)
end
- blocking_mode = can_block and queue_locks > 0
+ if process_limit then
+ local all_locks = redis.call('lrange', probed_key, 0, -1)
+ process_locks = 0
+ for _, process in ipairs(all_locks) do
+ if process == worker_name then
+ process_locks = process_locks + 1
+ end
+ end
+ end
+ blocking_mode = can_block and locks > 0
+
if blocking_mode and can_block ~= 'true' then
for unblocked_queue in string.gmatch(can_block, "[^,]+") do
unblocked[unblocked_queue] = true
end
end
- if not queue_limit or queue_limit > queue_locks then
+ if (not limit or limit > locks) and
+ (not process_limit or process_limit > process_locks) then
redis.call('rpush', probed_key, worker_name)
table.insert(available, queue)
end
end
end