Sha256: 51cdb19ceb600dd10d89ac9d0beef082789fbf4fe7a8b6a45dbd0cb6a32aae58
Contents?: true
Size: 1.31 KB
Versions: 6
Compression:
Stored size: 1.31 KB
Contents
module Sidekiq::LimitFetch::Global module Monitor extend self HEARTBEAT_NAMESPACE = 'heartbeat:' PROCESSOR_NAMESPACE = 'processor:' HEARTBEAT_TTL = 18 REFRESH_TIMEOUT = 10 def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) Thread.new do loop do update_heartbeat ttl invalidate_old_processors sleep timeout end end end private def update_heartbeat(ttl) Sidekiq.redis do |it| it.pipelined do it.set processor_key, true it.set heartbeat_key, true it.expire heartbeat_key, ttl end end end def invalidate_old_processors Sidekiq.redis do |it| it.keys(PROCESSOR_NAMESPACE + '*').each do |processor| processor.sub! PROCESSOR_NAMESPACE, '' next if it.get heartbeat_key processor it.del processor_key processor %w(limit_fetch:probed:* limit_fetch:busy:*).each do |pattern| it.keys(pattern).each do |queue| it.lrem queue, 0, processor end end end end end def heartbeat_key(processor=Selector.uuid) HEARTBEAT_NAMESPACE + processor end def processor_key(processor=Selector.uuid) PROCESSOR_NAMESPACE + processor end end end
Version data entries
6 entries across 6 versions & 1 rubygems