lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.16.2 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.17.0

- old
+ new

@@ -10,37 +10,10 @@ module Throttled # Throttled fetch strategy. # # @private class Fetch - module BulkRequeue - # Requeues all given units as a single operation. - # - # @see http://www.rubydoc.info/github/redis/redis-rb/master/Redis#pipelined-instance_method - # @param [Array<Fetch::UnitOfWork>] units - # @return [void] - def bulk_requeue(units, _options) - return if units.empty? - - Sidekiq.logger.debug { "Re-queueing terminated jobs" } - Sidekiq.redis do |conn| - conn.pipelined do |pipeline| - units.each { |unit| unit.requeue(pipeline) } - end - end - Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis") - rescue => e - Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}") - end - end - - # https://github.com/mperham/sidekiq/commit/fce05c9d4b4c0411c982078a4cf3a63f20f739bc - if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("6.1.0") - extend BulkRequeue - else - include BulkRequeue - end # Timeout to sleep between fetch retries in case of no job received, # as well as timeout to wait for redis to give us something to work. TIMEOUT = 2 # Initializes fetcher instance. @@ -75,10 +48,24 @@ @paused << QueueName.expand(work.queue_name) nil end + def bulk_requeue(units, _options) + return if units.empty? + + Sidekiq.logger.debug { "Re-queueing terminated jobs" } + Sidekiq.redis do |conn| + conn.pipelined do |pipeline| + units.each { |unit| unit.requeue(pipeline) } + end + end + Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis") + rescue => e + Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}") + end + private # Tries to pop pair of `queue` and job `message` out of sidekiq queues. # # @see http://redis.io/commands/brpop @@ -89,10 +76,10 @@ if queues.empty? sleep TIMEOUT return end - Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) } + Sidekiq.redis { |conn| conn.brpop(*queues, :timeout => TIMEOUT) } end # Returns list of queues to try to fetch jobs from. # # @note It may return an empty array.