lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.16.2 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.17.0
- old
+ new
@@ -10,37 +10,10 @@
module Throttled
# Throttled fetch strategy.
#
# @private
class Fetch
- module BulkRequeue
- # Requeues all given units as a single operation.
- #
- # @see http://www.rubydoc.info/github/redis/redis-rb/master/Redis#pipelined-instance_method
- # @param [Array<Fetch::UnitOfWork>] units
- # @return [void]
- def bulk_requeue(units, _options)
- return if units.empty?
-
- Sidekiq.logger.debug { "Re-queueing terminated jobs" }
- Sidekiq.redis do |conn|
- conn.pipelined do |pipeline|
- units.each { |unit| unit.requeue(pipeline) }
- end
- end
- Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis")
- rescue => e
- Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}")
- end
- end
-
- # https://github.com/mperham/sidekiq/commit/fce05c9d4b4c0411c982078a4cf3a63f20f739bc
- if Gem::Version.new(Sidekiq::VERSION) < Gem::Version.new("6.1.0")
- extend BulkRequeue
- else
- include BulkRequeue
- end
# Timeout to sleep between fetch retries in case of no job received,
# as well as timeout to wait for redis to give us something to work.
TIMEOUT = 2
# Initializes fetcher instance.
@@ -75,10 +48,24 @@
@paused << QueueName.expand(work.queue_name)
nil
end
+ def bulk_requeue(units, _options)
+ return if units.empty?
+
+ Sidekiq.logger.debug { "Re-queueing terminated jobs" }
+ Sidekiq.redis do |conn|
+ conn.pipelined do |pipeline|
+ units.each { |unit| unit.requeue(pipeline) }
+ end
+ end
+ Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis")
+ rescue => e
+ Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}")
+ end
+
private
# Tries to pop pair of `queue` and job `message` out of sidekiq queues.
#
# @see http://redis.io/commands/brpop
@@ -89,10 +76,10 @@
if queues.empty?
sleep TIMEOUT
return
end
- Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) }
+ Sidekiq.redis { |conn| conn.brpop(*queues, :timeout => TIMEOUT) }
end
# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.