lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-2.1.1 vs lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-2.1.2

- old
+ new

@@ -1,57 +1,57 @@ module Sidekiq::LimitFetch::Global module Monitor extend self - HEARTBEAT_NAMESPACE = 'heartbeat:' - PROCESSOR_NAMESPACE = 'processor:' - - HEARTBEAT_TTL = 18 + HEARTBEAT_PREFIX = 'heartbeat:' + PROCESS_SET = 'processes' + HEARTBEAT_TTL = 18 REFRESH_TIMEOUT = 10 def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) Thread.new do loop do update_heartbeat ttl - invalidate_old_processors + invalidate_old_processes sleep timeout end end end private def update_heartbeat(ttl) Sidekiq.redis do |it| it.pipelined do - it.set processor_key, true + it.sadd PROCESS_SET, Selector.uuid it.set heartbeat_key, true it.expire heartbeat_key, ttl end end end - def invalidate_old_processors + def invalidate_old_processes Sidekiq.redis do |it| - it.keys(PROCESSOR_NAMESPACE + '*').each do |processor| - processor.sub! PROCESSOR_NAMESPACE, '' - next if it.get heartbeat_key processor + processes = it.smembers PROCESS_SET + processes.each do |process| + unless it.get heartbeat_key process + processes.delete process + it.srem PROCESS_SET, process + end + end - it.del processor_key processor - %w(limit_fetch:probed:* limit_fetch:busy:*).each do |pattern| - it.keys(pattern).each do |queue| - it.lrem queue, 0, processor + Sidekiq::Queue.instances.map(&:name).uniq.each do |queue| + locks = it.lrange "limit_fetch:probed:#{queue}", 0, -1 + (locks.uniq - processes).each do |dead_process| + %w(limit_fetch:probed: limit_fetch:busy:).each do |prefix| + it.lrem prefix + queue, 0, dead_process end end end end end - def heartbeat_key(processor=Selector.uuid) - HEARTBEAT_NAMESPACE + processor - end - - def processor_key(processor=Selector.uuid) - PROCESSOR_NAMESPACE + processor + def heartbeat_key(process=Selector.uuid) + HEARTBEAT_PREFIX + process end end end