lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-4.4.0 vs lib/sidekiq/limit_fetch/global/semaphore.rb in sidekiq-limit_fetch-4.4.1
- old
+ new
@@ -1,138 +1,143 @@
-module Sidekiq::LimitFetch::Global
- class Semaphore
- PREFIX = 'limit_fetch'
+# frozen_string_literal: true
- attr_reader :local_busy
+module Sidekiq
+ module LimitFetch
+ module Global
+ class Semaphore
+ PREFIX = 'limit_fetch'
- def initialize(name)
- @name = name
- @lock = Mutex.new
- @local_busy = 0
- end
+ attr_reader :local_busy
- def limit
- value = redis {|it| it.get "#{PREFIX}:limit:#@name" }
- value.to_i if value
- end
+ def initialize(name)
+ @name = name
+ @lock = Mutex.new
+ @local_busy = 0
+ end
- def limit=(value)
- @limit_changed = true
+ def limit
+ value = redis { |it| it.get "#{PREFIX}:limit:#{@name}" }
+ value&.to_i
+ end
- if value
- redis {|it| it.set "#{PREFIX}:limit:#@name", value }
- else
- redis {|it| it.del "#{PREFIX}:limit:#@name" }
- end
- end
+ def limit=(value)
+ @limit_changed = true
- def limit_changed?
- @limit_changed
- end
+ if value
+ redis { |it| it.set "#{PREFIX}:limit:#{@name}", value }
+ else
+ redis { |it| it.del "#{PREFIX}:limit:#{@name}" }
+ end
+ end
- def process_limit
- value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" }
- value.to_i if value
- end
+ def limit_changed?
+ @limit_changed
+ end
- def process_limit=(value)
- if value
- redis {|it| it.set "#{PREFIX}:process_limit:#@name", value }
- else
- redis {|it| it.del "#{PREFIX}:process_limit:#@name" }
- end
- end
+ def process_limit
+ value = redis { |it| it.get "#{PREFIX}:process_limit:#{@name}" }
+ value&.to_i
+ end
- def acquire
- Selector.acquire([@name], namespace).size > 0
- end
+ def process_limit=(value)
+ if value
+ redis { |it| it.set "#{PREFIX}:process_limit:#{@name}", value }
+ else
+ redis { |it| it.del "#{PREFIX}:process_limit:#{@name}" }
+ end
+ end
- def release
- redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid }
- end
+ def acquire
+ Selector.acquire([@name], namespace).size.positive?
+ end
- def busy
- redis {|it| it.llen "#{PREFIX}:busy:#@name" }
- end
+ def release
+ redis { |it| it.lrem "#{PREFIX}:probed:#{@name}", 1, Selector.uuid }
+ end
- def busy_processes
- redis {|it| it.lrange "#{PREFIX}:busy:#@name", 0, -1 }
- end
+ def busy
+ redis { |it| it.llen "#{PREFIX}:busy:#{@name}" }
+ end
- def increase_busy
- increase_local_busy
- redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
- end
+ def busy_processes
+ redis { |it| it.lrange "#{PREFIX}:busy:#{@name}", 0, -1 }
+ end
- def decrease_busy
- decrease_local_busy
- redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
- end
+ def increase_busy
+ increase_local_busy
+ redis { |it| it.rpush "#{PREFIX}:busy:#{@name}", Selector.uuid }
+ end
- def probed
- redis {|it| it.llen "#{PREFIX}:probed:#@name" }
- end
+ def decrease_busy
+ decrease_local_busy
+ redis { |it| it.lrem "#{PREFIX}:busy:#{@name}", 1, Selector.uuid }
+ end
- def probed_processes
- redis {|it| it.lrange "#{PREFIX}:probed:#@name", 0, -1 }
- end
+ def probed
+ redis { |it| it.llen "#{PREFIX}:probed:#{@name}" }
+ end
- def pause
- redis {|it| it.set "#{PREFIX}:pause:#@name", '1' }
- end
+ def probed_processes
+ redis { |it| it.lrange "#{PREFIX}:probed:#{@name}", 0, -1 }
+ end
- def pause_for_ms ms
- redis {|it| it.psetex "#{PREFIX}:pause:#@name", ms, 1 }
- end
+ def pause
+ redis { |it| it.set "#{PREFIX}:pause:#{@name}", '1' }
+ end
- def unpause
- redis {|it| it.del "#{PREFIX}:pause:#@name" }
- end
+ def pause_for_ms(milliseconds)
+ redis { |it| it.psetex "#{PREFIX}:pause:#{@name}", milliseconds, 1 }
+ end
- def paused?
- redis {|it| it.get "#{PREFIX}:pause:#@name" } == '1'
- end
+ def unpause
+ redis { |it| it.del "#{PREFIX}:pause:#{@name}" }
+ end
- def block
- redis {|it| it.set "#{PREFIX}:block:#@name", '1' }
- end
+ def paused?
+ redis { |it| it.get "#{PREFIX}:pause:#{@name}" } == '1'
+ end
- def block_except(*queues)
- raise ArgumentError if queues.empty?
- redis {|it| it.set "#{PREFIX}:block:#@name", queues.join(',') }
- end
+ def block
+ redis { |it| it.set "#{PREFIX}:block:#{@name}", '1' }
+ end
- def unblock
- redis {|it| it.del "#{PREFIX}:block:#@name" }
- end
+ def block_except(*queues)
+ raise ArgumentError if queues.empty?
- def blocking?
- redis {|it| it.get "#{PREFIX}:block:#@name" } == '1'
- end
+ redis { |it| it.set "#{PREFIX}:block:#{@name}", queues.join(',') }
+ end
- def clear_limits
- redis do |it|
- %w(block busy limit pause probed process_limit).each do |key|
- it.del "#{PREFIX}:#{key}:#@name"
+ def unblock
+ redis { |it| it.del "#{PREFIX}:block:#{@name}" }
end
- end
- end
- def increase_local_busy
- @lock.synchronize { @local_busy += 1 }
- end
+ def blocking?
+ redis { |it| it.get "#{PREFIX}:block:#{@name}" } == '1'
+ end
- def decrease_local_busy
- @lock.synchronize { @local_busy -= 1 }
- end
+ def clear_limits
+ redis do |it|
+ %w[block busy limit pause probed process_limit].each do |key|
+ it.del "#{PREFIX}:#{key}:#{@name}"
+ end
+ end
+ end
- def local_busy?
- @local_busy > 0
- end
+ def increase_local_busy
+ @lock.synchronize { @local_busy += 1 }
+ end
- def explain
- <<-END.gsub(/^ {8}/, '')
+ def decrease_local_busy
+ @lock.synchronize { @local_busy -= 1 }
+ end
+
+ def local_busy?
+ @local_busy.positive?
+ end
+
+ def explain
+ <<-INFO.gsub(/^ {8}/, '')
Current sidekiq process: #{Selector.uuid}
All processes:
#{Monitor.all_processes.join "\n"}
@@ -151,33 +156,35 @@
Process limit:
#{process_limit.inspect}
Blocking:
#{blocking?}
- END
- end
+ INFO
+ end
- def remove_locks_except!(processes)
- locked_processes = probed_processes.uniq
- (locked_processes - processes).each do |dead_process|
- remove_lock! dead_process
- end
- end
+ def remove_locks_except!(processes)
+ locked_processes = probed_processes.uniq
+ (locked_processes - processes).each do |dead_process|
+ remove_lock! dead_process
+ end
+ end
- def remove_lock!(process)
- redis do |it|
- it.lrem "#{PREFIX}:probed:#@name", 0, process
- it.lrem "#{PREFIX}:busy:#@name", 0, process
- end
- end
+ def remove_lock!(process)
+ redis do |it|
+ it.lrem "#{PREFIX}:probed:#{@name}", 0, process
+ it.lrem "#{PREFIX}:busy:#{@name}", 0, process
+ end
+ end
- private
+ private
- def redis(&block)
- Sidekiq.redis(&block)
- end
+ def redis(&block)
+ Sidekiq.redis(&block)
+ end
- def namespace
- Sidekiq::LimitFetch::Queues.namespace
+ def namespace
+ Sidekiq::LimitFetch::Queues.namespace
+ end
+ end
end
end
end