Sha256: 82bf6b4fd7705c3cb6f1a3c241608cd397ec97224b601efa243387367b37950b
Contents?: true
Size: 1.19 KB
Versions: 3
Compression:
Stored size: 1.19 KB
Contents
module Sidekiq::LimitFetch::Global module Monitor extend self HEARTBEAT_NAMESPACE = 'heartbeat:' PROCESSOR_NAMESPACE = 'processor:' HEARTBEAT_TTL = 90 REFRESH_TIMEOUT = 60 def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) Thread.new do loop do update_heartbeat ttl invalidate_old_processors sleep timeout end end end private def update_heartbeat(ttl) Sidekiq.redis do |it| it.set processor_key, true it.set heartbeat_key, true it.expire heartbeat_key, ttl end end def invalidate_old_processors Sidekiq.redis do |it| it.keys(PROCESSOR_NAMESPACE + '*').each do |processor| processor.sub! PROCESSOR_NAMESPACE, '' next if it.get heartbeat_key processor it.del processor_key processor it.keys('limit_fetch:busy:*').each do |queue| it.lrem queue, 0, processor end end end end def heartbeat_key(processor=Selector.uuid) HEARTBEAT_NAMESPACE + processor end def processor_key(processor=Selector.uuid) PROCESSOR_NAMESPACE + processor end end end
Version data entries
3 entries across 3 versions & 1 rubygems