lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-4.4.0 vs lib/sidekiq/limit_fetch/global/monitor.rb in sidekiq-limit_fetch-4.4.1

- old
+ new

@@ -1,77 +1,83 @@ -module Sidekiq::LimitFetch::Global - module Monitor - extend self +# frozen_string_literal: true - HEARTBEAT_PREFIX = 'limit:heartbeat:' - PROCESS_SET = 'limit:processes' - HEARTBEAT_TTL = 20 - REFRESH_TIMEOUT = 5 +module Sidekiq + module LimitFetch + module Global + module Monitor + extend self - def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) - Thread.new do - loop do - Sidekiq::LimitFetch.redis_retryable do - handle_dynamic_queues - update_heartbeat ttl - invalidate_old_processes + HEARTBEAT_PREFIX = 'limit:heartbeat:' + PROCESS_SET = 'limit:processes' + HEARTBEAT_TTL = 20 + REFRESH_TIMEOUT = 5 + + def start!(ttl = HEARTBEAT_TTL, timeout = REFRESH_TIMEOUT) + Thread.new do + loop do + Sidekiq::LimitFetch.redis_retryable do + handle_dynamic_queues + update_heartbeat ttl + invalidate_old_processes + end + + sleep timeout + end end + end - sleep timeout + def all_processes + Sidekiq.redis { |it| it.smembers PROCESS_SET } end - end - end - def all_processes - Sidekiq.redis {|it| it.smembers PROCESS_SET } - end + def old_processes + all_processes.reject do |process| + Sidekiq.redis { |it| it.get heartbeat_key process } == '1' + end + end - def old_processes - all_processes.reject do |process| - Sidekiq.redis {|it| it.get heartbeat_key process } == '1' - end - end + def remove_old_processes! + Sidekiq.redis do |it| + old_processes.each { |process| it.srem PROCESS_SET, [process] } + end + end - def remove_old_processes! - Sidekiq.redis do |it| - old_processes.each {|process| it.srem PROCESS_SET, [process] } - end - end + def handle_dynamic_queues + queues = Sidekiq::LimitFetch::Queues + return unless queues.dynamic? - def handle_dynamic_queues - queues = Sidekiq::LimitFetch::Queues - return unless queues.dynamic? + available_queues = Sidekiq::Queue.all.map(&:name).reject do |it| + queues.dynamic_exclude.include? it + end + queues.handle available_queues + end - available_queues = Sidekiq::Queue.all.map(&:name).reject do |it| - queues.dynamic_exclude.include? it - end - queues.handle available_queues - end + private - private - - def update_heartbeat(ttl) - Sidekiq.redis do |it| - it.multi do |pipeline| - pipeline.set heartbeat_key, '1' - pipeline.sadd PROCESS_SET, [Selector.uuid] - pipeline.expire heartbeat_key, ttl + def update_heartbeat(ttl) + Sidekiq.redis do |it| + it.multi do |pipeline| + pipeline.set heartbeat_key, '1' + pipeline.sadd PROCESS_SET, [Selector.uuid] + pipeline.expire heartbeat_key, ttl + end + end end - end - end - def invalidate_old_processes - Sidekiq.redis do |it| - remove_old_processes! - processes = all_processes + def invalidate_old_processes + Sidekiq.redis do |_it| + remove_old_processes! + processes = all_processes - Sidekiq::Queue.instances.each do |queue| - queue.remove_locks_except! processes + Sidekiq::Queue.instances.each do |queue| + queue.remove_locks_except! processes + end + end end - end - end - def heartbeat_key(process=Selector.uuid) - HEARTBEAT_PREFIX + process + def heartbeat_key(process = Selector.uuid) + HEARTBEAT_PREFIX + process + end + end end end end