lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-2.0.2 vs lib/sidekiq/limit_fetch/global/selector.rb in sidekiq-limit_fetch-2.1.0

- old
+ new

@@ -46,38 +46,53 @@ local namespace = table.remove(ARGV, 1)..'limit_fetch:' local worker_name = table.remove(ARGV, 1) local queues = ARGV local available = {} local unblocked = {} - local queue_locks + local locks + local process_locks local blocking_mode for _, queue in ipairs(queues) do if not blocking_mode or unblocked[queue] then local probed_key = namespace..'probed:'..queue local pause_key = namespace..'pause:'..queue local paused = redis.call('get', pause_key) if not paused then - local limit_key = namespace..'limit:'..queue - local queue_limit = tonumber(redis.call('get', limit_key)) + local limit_key = namespace..'limit:'..queue + local limit = tonumber(redis.call('get', limit_key)) + local process_limit_key = namespace..'process_limit:'..queue + local process_limit = tonumber(redis.call('get', process_limit_key)) + local block_key = namespace..'block:'..queue local can_block = redis.call('get', block_key) - if can_block or queue_limit then - queue_locks = redis.call('llen', probed_key) + if can_block or limit then + locks = redis.call('llen', probed_key) end - blocking_mode = can_block and queue_locks > 0 + if process_limit then + local all_locks = redis.call('lrange', probed_key, 0, -1) + process_locks = 0 + for _, process in ipairs(all_locks) do + if process == worker_name then + process_locks = process_locks + 1 + end + end + end + blocking_mode = can_block and locks > 0 + if blocking_mode and can_block ~= 'true' then for unblocked_queue in string.gmatch(can_block, "[^,]+") do unblocked[unblocked_queue] = true end end - if not queue_limit or queue_limit > queue_locks then + if (not limit or limit > locks) and + (not process_limit or process_limit > process_locks) then redis.call('rpush', probed_key, worker_name) table.insert(available, queue) end end end