lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.1 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.2
- old
+ new
@@ -1,9 +1,9 @@
# frozen_string_literal: true
require "sidekiq"
-require "sidekiq/throttled/unit_of_work"
+require "sidekiq/throttled/fetch/unit_of_work"
require "sidekiq/throttled/queues_pauser"
require "sidekiq/throttled/queue_name"
module Sidekiq
module Throttled
@@ -20,33 +20,50 @@
@queues = options[:queues].map { |q| QueueName.expand q }
@queues.uniq! if @strict
end
+ # Retrieves job from redis.
+ #
# @return [Sidekiq::Throttled::UnitOfWork, nil]
def retrieve_work
work = brpop
return unless work
work = UnitOfWork.new(*work)
return work unless Throttled.throttled? work.job
- Sidekiq.redis do |conn|
- conn.lpush(QueueName.expand(work.queue_name), work.job)
- end
+ work.throttled_requeue
nil
end
+ class << self
+ # Requeues all given units as a single operation.
+ #
+ # @see http://www.rubydoc.info/github/redis/redis-rb/master/Redis#pipelined-instance_method
+ # @param [Array<Fetch::UnitOfWork>] units
+ # @return [void]
+ def bulk_requeue(units, _options)
+ return if units.empty?
+
+ Sidekiq.logger.debug { "Re-queueing terminated jobs" }
+ Sidekiq.redis { |conn| conn.pipelined { units.each(&:requeue) } }
+ Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis")
+ rescue => e
+ Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}")
+ end
+ end
+
private
- # Tries to pop pair of `queue` and job `message` out of sidekiq queue.
+ # Tries to pop pair of `queue` and job `message` out of sidekiq queues.
#
# @see http://redis.io/commands/brpop
# @return [Array(String, String), nil]
def brpop
- queues = build_queues_list
+ queues = filter_queues(@strict ? @queues : @queues.shuffle.uniq)
if queues.empty?
sleep TIMEOUT
return
end
@@ -55,12 +72,13 @@
end
# Returns list of queues to try to fetch jobs from.
#
# @note It may return an empty array.
+ # @param [Array<String>] queues
# @return [Array<String>]
- def build_queues_list
- QueuesPauser.instance.filter(@strict ? @queues : @queues.shuffle.uniq)
+ def filter_queues(queues)
+ QueuesPauser.instance.filter(queues)
end
end
end
end