lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.1 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.2

- old
+ new

@@ -1,9 +1,9 @@ # frozen_string_literal: true require "sidekiq" -require "sidekiq/throttled/unit_of_work" +require "sidekiq/throttled/fetch/unit_of_work" require "sidekiq/throttled/queues_pauser" require "sidekiq/throttled/queue_name" module Sidekiq module Throttled @@ -20,33 +20,50 @@ @queues = options[:queues].map { |q| QueueName.expand q } @queues.uniq! if @strict end + # Retrieves job from redis. + # # @return [Sidekiq::Throttled::UnitOfWork, nil] def retrieve_work work = brpop return unless work work = UnitOfWork.new(*work) return work unless Throttled.throttled? work.job - Sidekiq.redis do |conn| - conn.lpush(QueueName.expand(work.queue_name), work.job) - end + work.throttled_requeue nil end + class << self + # Requeues all given units as a single operation. + # + # @see http://www.rubydoc.info/github/redis/redis-rb/master/Redis#pipelined-instance_method + # @param [Array<Fetch::UnitOfWork>] units + # @return [void] + def bulk_requeue(units, _options) + return if units.empty? + + Sidekiq.logger.debug { "Re-queueing terminated jobs" } + Sidekiq.redis { |conn| conn.pipelined { units.each(&:requeue) } } + Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis") + rescue => e + Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}") + end + end + private - # Tries to pop pair of `queue` and job `message` out of sidekiq queue. + # Tries to pop pair of `queue` and job `message` out of sidekiq queues. # # @see http://redis.io/commands/brpop # @return [Array(String, String), nil] def brpop - queues = build_queues_list + queues = filter_queues(@strict ? @queues : @queues.shuffle.uniq) if queues.empty? sleep TIMEOUT return end @@ -55,12 +72,13 @@ end # Returns list of queues to try to fetch jobs from. # # @note It may return an empty array. + # @param [Array<String>] queues # @return [Array<String>] - def build_queues_list - QueuesPauser.instance.filter(@strict ? @queues : @queues.shuffle.uniq) + def filter_queues(queues) + QueuesPauser.instance.filter(queues) end end end end