lib/sidekiq/throttled/queues_pauser.rb in sidekiq-throttled-0.9.0 vs lib/sidekiq/throttled/queues_pauser.rb in sidekiq-throttled-0.10.0.alpha

- old
+ new

@@ -37,31 +37,27 @@ # Initializes singleton instance. def initialize @paused_queues = Set.new @communicator = Communicator.instance + @mutex = Mutex.new end # Configures Sidekiq server to keep actual list of paused queues. # # @private # @return [void] def setup! Patches::Queue.apply! - Sidekiq.configure_server do - @communicator.receive PAUSE_MESSAGE do |queue| - @paused_queues << QueueName.expand(queue) - end + Sidekiq.configure_server do |config| + config.on(:startup) { start_watcher } + config.on(:quiet) { stop_watcher } - @communicator.receive RESUME_MESSAGE do |queue| - @paused_queues.delete QueueName.expand(queue) - end - - @communicator.ready do - @paused_queues.replace(paused_queues.map { |q| QueueName.expand q }) - end + @communicator.receive(PAUSE_MESSAGE, &method(:add)) + @communicator.receive(RESUME_MESSAGE, &method(:delete)) + @communicator.ready(&method(:sync!)) end end # Returns queues list with paused queues being stripped out. # @@ -108,9 +104,44 @@ queue = QueueName.normalize queue.to_s Sidekiq.redis do |conn| conn.srem(PAUSED_QUEUES, queue) @communicator.transmit(conn, RESUME_MESSAGE, queue) + end + end + + private + + def add(queue) + @mutex.synchronize do + @paused_queues << QueueName.expand(queue) + end + end + + def delete(queue) + @mutex.synchronize do + @paused_queues.delete QueueName.expand(queue) + end + end + + def sync! + @mutex.synchronize do + @paused_queues.replace(paused_queues.map { |q| QueueName.expand q }) + end + end + + def start_watcher + @mutex.synchronize do + @watcher ||= Concurrent::TimerTask.execute({ + :run_now => true, + :execution_interval => 60 + }) { sync! } + end + end + + def stop_watcher + @mutex.synchronize do + defined?(@watcher) && @watcher&.shutdown end end end end end