lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.6 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.7

- old
+ new

@@ -1,23 +1,27 @@ # frozen_string_literal: true require "sidekiq" +require "sidekiq/throttled/expirable_list" require "sidekiq/throttled/fetch/unit_of_work" require "sidekiq/throttled/queues_pauser" require "sidekiq/throttled/queue_name" module Sidekiq module Throttled # Throttled fetch strategy. # # @private class Fetch + # Timeout to sleep between fetch retries in case of no job received, + # as well as timeout to wait for redis to give us something to work. TIMEOUT = 2 - private_constant :TIMEOUT # Initializes fetcher instance. def initialize(options) + @paused = ExpirableList.new(TIMEOUT) + @strict = options[:strict] @queues = options[:queues].map { |q| QueueName.expand q } @queues.uniq! if @strict end @@ -31,11 +35,11 @@ work = UnitOfWork.new(*work) return work unless work.throttled? work.requeue_throttled - sleep TIMEOUT + @paused << QueueName.expand(work.queue_name) nil end class << self @@ -76,10 +80,10 @@ # # @note It may return an empty array. # @param [Array<String>] queues # @return [Array<String>] def filter_queues(queues) - QueuesPauser.instance.filter(queues) + QueuesPauser.instance.filter(queues) - @paused.to_a end end end end