lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-4.4.0 vs lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-4.4.1

- old
+ new

@@ -1,138 +1,143 @@ -module Sidekiq::LimitFetch::Global - class Semaphore - PREFIX = 'limit_fetch' +# frozen_string_literal: true - attr_reader :local_busy +module Sidekiq + module LimitFetch + module Global + class Semaphore + PREFIX = 'limit_fetch' - def initialize(name) - @name = name - @lock = Mutex.new - @local_busy = 0 - end + attr_reader :local_busy - def limit - value = redis {|it| it.get "#{PREFIX}:limit:#@name" } - value.to_i if value - end + def initialize(name) + @name = name + @lock = Mutex.new + @local_busy = 0 + end - def limit=(value) - @limit_changed = true + def limit + value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" } + value&.to_i + end - if value - redis {|it| it.set "#{PREFIX}:limit:#@name", value } - else - redis {|it| it.del "#{PREFIX}:limit:#@name" } - end - end + def limit=(value) + @limit_changed = true - def limit_changed? - @limit_changed - end + if value + redis { |it| it.set "#{PREFIX}:limit:#{@name}", value } + else + redis { |it| it.del "#{PREFIX}:limit:#{@name}" } + end + end - def process_limit - value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" } - value.to_i if value - end + def limit_changed? + @limit_changed + end - def process_limit=(value) - if value - redis {|it| it.set "#{PREFIX}:process_limit:#@name", value } - else - redis {|it| it.del "#{PREFIX}:process_limit:#@name" } - end - end + def process_limit + value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" } + value&.to_i + end - def acquire - Selector.acquire([@name], namespace).size > 0 - end + def process_limit=(value) + if value + redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value } + else + redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" } + end + end - def release - redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid } - end + def acquire + Selector.acquire([@name], namespace).size.positive? + end - def busy - redis {|it| it.llen "#{PREFIX}:busy:#@name" } - end + def release + redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid } + end - def busy_processes - redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 } - end + def busy + redis { |it| it.llen "#{PREFIX}:busy:#{@name}" } + end - def increase_busy - increase_local_busy - redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid } - end + def busy_processes + redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 } + end - def decrease_busy - decrease_local_busy - redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid } - end + def increase_busy + increase_local_busy + redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid } + end - def probed - redis {|it| it.llen "#{PREFIX}:probed:#@name" } - end + def decrease_busy + decrease_local_busy + redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid } + end - def probed_processes - redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 } - end + def probed + redis { |it| it.llen "#{PREFIX}:probed:#{@name}" } + end - def pause - redis {|it| it.set "#{PREFIX}:pause:#@name", '1' } - end + def probed_processes + redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 } + end - def pause_for_ms ms - redis {|it| it.psetex "#{PREFIX}:pause:#@name", ms, 1 } - end + def pause + redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' } + end - def unpause - redis {|it| it.del "#{PREFIX}:pause:#@name" } - end + def pause_for_ms(milliseconds) + redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 } + end - def paused? - redis {|it| it.get "#{PREFIX}:pause:#@name" } == '1' - end + def unpause + redis { |it| it.del "#{PREFIX}:pause:#{@name}" } + end - def block - redis {|it| it.set "#{PREFIX}:block:#@name", '1' } - end + def paused? + redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1' + end - def block_except(*queues) - raise ArgumentError if queues.empty? - redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') } - end + def block + redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' } + end - def unblock - redis {|it| it.del "#{PREFIX}:block:#@name" } - end + def block_except(*queues) + raise ArgumentError if queues.empty? - def blocking? - redis {|it| it.get "#{PREFIX}:block:#@name" } == '1' - end + redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') } + end - def clear_limits - redis do |it| - %w(block busy limit pause probed process_limit).each do |key| - it.del "#{PREFIX}:#{key}:#@name" + def unblock + redis { |it| it.del "#{PREFIX}:block:#{@name}" } end - end - end - def increase_local_busy - @lock.synchronize { @local_busy += 1 } - end + def blocking? + redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1' + end - def decrease_local_busy - @lock.synchronize { @local_busy -= 1 } - end + def clear_limits + redis do |it| + %w[block busy limit pause probed process_limit].each do |key| + it.del "#{PREFIX}:#{key}:#{@name}" + end + end + end - def local_busy? - @local_busy > 0 - end + def increase_local_busy + @lock.synchronize { @local_busy += 1 } + end - def explain - <<-END.gsub(/^ {8}/, '') + def decrease_local_busy + @lock.synchronize { @local_busy -= 1 } + end + + def local_busy? + @local_busy.positive? + end + + def explain + <<-INFO.gsub(/^ {8}/, '') Current sidekiq process: #{Selector.uuid} All processes: #{Monitor.all_processes.join "\n"} @@ -151,33 +156,35 @@ Process limit: #{process_limit.inspect} Blocking: #{blocking?} - END - end + INFO + end - def remove_locks_except!(processes) - locked_processes = probed_processes.uniq - (locked_processes - processes).each do |dead_process| - remove_lock! dead_process - end - end + def remove_locks_except!(processes) + locked_processes = probed_processes.uniq + (locked_processes - processes).each do |dead_process| + remove_lock! dead_process + end + end - def remove_lock!(process) - redis do |it| - it.lrem "#{PREFIX}:probed:#@name", 0, process - it.lrem "#{PREFIX}:busy:#@name", 0, process - end - end + def remove_lock!(process) + redis do |it| + it.lrem "#{PREFIX}:probed:#{@name}", 0, process + it.lrem "#{PREFIX}:busy:#{@name}", 0, process + end + end - private + private - def redis(&block) - Sidekiq.redis(&block) - end + def redis(&block) + Sidekiq.redis(&block) + end - def namespace - Sidekiq::LimitFetch::Queues.namespace + def namespace + Sidekiq::LimitFetch::Queues.namespace + end + end end end end