lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-2.1.1 vs lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-2.1.2
- old
+ new
@@ -1,57 +1,57 @@
module Sidekiq::LimitFetch::Global
module Monitor
extend self
- HEARTBEAT_NAMESPACE = 'heartbeat:'
- PROCESSOR_NAMESPACE = 'processor:'
-
- HEARTBEAT_TTL = 18
+ HEARTBEAT_PREFIX = 'heartbeat:'
+ PROCESS_SET = 'processes'
+ HEARTBEAT_TTL = 18
REFRESH_TIMEOUT = 10
def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
Thread.new do
loop do
update_heartbeat ttl
- invalidate_old_processors
+ invalidate_old_processes
sleep timeout
end
end
end
private
def update_heartbeat(ttl)
Sidekiq.redis do |it|
it.pipelined do
- it.set processor_key, true
+ it.sadd PROCESS_SET, Selector.uuid
it.set heartbeat_key, true
it.expire heartbeat_key, ttl
end
end
end
- def invalidate_old_processors
+ def invalidate_old_processes
Sidekiq.redis do |it|
- it.keys(PROCESSOR_NAMESPACE + '*').each do |processor|
- processor.sub! PROCESSOR_NAMESPACE, ''
- next if it.get heartbeat_key processor
+ processes = it.smembers PROCESS_SET
+ processes.each do |process|
+ unless it.get heartbeat_key process
+ processes.delete process
+ it.srem PROCESS_SET, process
+ end
+ end
- it.del processor_key processor
- %w(limit_fetch:probed:* limit_fetch:busy:*).each do |pattern|
- it.keys(pattern).each do |queue|
- it.lrem queue, 0, processor
+ Sidekiq::Queue.instances.map(&:name).uniq.each do |queue|
+ locks = it.lrange "limit_fetch:probed:#{queue}", 0, -1
+ (locks.uniq - processes).each do |dead_process|
+ %w(limit_fetch:probed: limit_fetch:busy:).each do |prefix|
+ it.lrem prefix + queue, 0, dead_process
end
end
end
end
end
- def heartbeat_key(processor=Selector.uuid)
- HEARTBEAT_NAMESPACE + processor
- end
-
- def processor_key(processor=Selector.uuid)
- PROCESSOR_NAMESPACE + processor
+ def heartbeat_key(process=Selector.uuid)
+ HEARTBEAT_PREFIX + process
end
end
end