lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-2.4.2 vs lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-3.0.0
- old
+ new
@@ -1,43 +1,43 @@
module Sidekiq::LimitFetch::Global
module Monitor
- include Sidekiq::LimitFetch::Redis
extend self
HEARTBEAT_PREFIX = 'limit:heartbeat:'
PROCESS_SET = 'limit:processes'
HEARTBEAT_TTL = 20
REFRESH_TIMEOUT = 5
- def start!(queues, ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
+ def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
Thread.new do
loop do
- add_dynamic queues if queues.dynamic?
+ add_dynamic_queues
update_heartbeat ttl
invalidate_old_processes
sleep timeout
end
end
end
def all_processes
- redis {|it| it.smembers PROCESS_SET }
+ Sidekiq.redis {|it| it.smembers PROCESS_SET }
end
def old_processes
all_processes.reject do |process|
- redis {|it| it.get heartbeat_key process }
+ Sidekiq.redis {|it| it.get heartbeat_key process }
end
end
def remove_old_processes!
- redis do |it|
+ Sidekiq.redis do |it|
old_processes.each {|process| it.srem PROCESS_SET, process }
end
end
- def add_dynamic(queues)
- queues.add Sidekiq::Queue.all.map(&:name)
+ def add_dynamic_queues
+ queues = Sidekiq::LimitFetch::Queues
+ queues.add Sidekiq::Queue.all.map(&:name) if queues.dynamic?
end
private
def update_heartbeat(ttl)