lib/sidekiq/throttled/queues_pauser.rb in sidekiq-throttled-0.9.0 vs lib/sidekiq/throttled/queues_pauser.rb in sidekiq-throttled-0.10.0.alpha
- old
+ new
@@ -37,31 +37,27 @@
# Initializes singleton instance.
def initialize
@paused_queues = Set.new
@communicator = Communicator.instance
+ @mutex = Mutex.new
end
# Configures Sidekiq server to keep actual list of paused queues.
#
# @private
# @return [void]
def setup!
Patches::Queue.apply!
- Sidekiq.configure_server do
- @communicator.receive PAUSE_MESSAGE do |queue|
- @paused_queues << QueueName.expand(queue)
- end
+ Sidekiq.configure_server do |config|
+ config.on(:startup) { start_watcher }
+ config.on(:quiet) { stop_watcher }
- @communicator.receive RESUME_MESSAGE do |queue|
- @paused_queues.delete QueueName.expand(queue)
- end
-
- @communicator.ready do
- @paused_queues.replace(paused_queues.map { |q| QueueName.expand q })
- end
+ @communicator.receive(PAUSE_MESSAGE, &method(:add))
+ @communicator.receive(RESUME_MESSAGE, &method(:delete))
+ @communicator.ready(&method(:sync!))
end
end
# Returns queues list with paused queues being stripped out.
#
@@ -108,9 +104,44 @@
queue = QueueName.normalize queue.to_s
Sidekiq.redis do |conn|
conn.srem(PAUSED_QUEUES, queue)
@communicator.transmit(conn, RESUME_MESSAGE, queue)
+ end
+ end
+
+ private
+
+ def add(queue)
+ @mutex.synchronize do
+ @paused_queues << QueueName.expand(queue)
+ end
+ end
+
+ def delete(queue)
+ @mutex.synchronize do
+ @paused_queues.delete QueueName.expand(queue)
+ end
+ end
+
+ def sync!
+ @mutex.synchronize do
+ @paused_queues.replace(paused_queues.map { |q| QueueName.expand q })
+ end
+ end
+
+ def start_watcher
+ @mutex.synchronize do
+ @watcher ||= Concurrent::TimerTask.execute({
+ :run_now => true,
+ :execution_interval => 60
+ }) { sync! }
+ end
+ end
+
+ def stop_watcher
+ @mutex.synchronize do
+ defined?(@watcher) && @watcher&.shutdown
end
end
end
end
end