Sha256: a5fb2ea3020ec2e6106999dc7d9d45f745a1675e34e78ef000716f881eeb42e3
Contents?: true
Size: 1.23 KB
Versions: 1
Compression:
Stored size: 1.23 KB
Contents
module Sidekiq::LimitFetch::Global module Monitor extend self HEARTBEAT_NAMESPACE = 'heartbeat:' PROCESSOR_NAMESPACE = 'processor:' HEARTBEAT_TTL = 18 REFRESH_TIMEOUT = 10 def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) Thread.new do loop do update_heartbeat ttl invalidate_old_processors sleep timeout end end end private def update_heartbeat(ttl) Sidekiq.redis do |it| it.pipelined do it.set processor_key, true it.set heartbeat_key, true it.expire heartbeat_key, ttl end end end def invalidate_old_processors Sidekiq.redis do |it| it.keys(PROCESSOR_NAMESPACE + '*').each do |processor| processor.sub! PROCESSOR_NAMESPACE, '' next if it.get heartbeat_key processor it.del processor_key processor it.keys('limit_fetch:busy:*').each do |queue| it.lrem queue, 0, processor end end end end def heartbeat_key(processor=Selector.uuid) HEARTBEAT_NAMESPACE + processor end def processor_key(processor=Selector.uuid) PROCESSOR_NAMESPACE + processor end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-limit_fetch-1.6 | lib/sidekiq/limit_fetch/global/monitor.rb |