lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.18.0 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-1.0.0.alpha
- old
+ new
@@ -1,94 +1,10 @@
# frozen_string_literal: true
-require "sidekiq"
-require "sidekiq/throttled/expirable_list"
-require "sidekiq/throttled/fetch/unit_of_work"
-require "sidekiq/throttled/queues_pauser"
-require "sidekiq/throttled/queue_name"
+require_relative "./basic_fetch"
module Sidekiq
module Throttled
- # Throttled fetch strategy.
- #
- # @private
- class Fetch
- # Timeout to sleep between fetch retries in case of no job received,
- # as well as timeout to wait for redis to give us something to work.
- TIMEOUT = 2
-
- # Initializes fetcher instance.
- # @param options [Hash]
- # @option options [Integer] :throttled_queue_cooldown (TIMEOUT)
- # Min delay in seconds before queue will be polled again after
- # throttled job.
- # @option options [Boolean] :strict (false)
- # @option options [Array<#to_s>] :queue
- def initialize(options)
- @paused = ExpirableList.new(options.fetch(:throttled_queue_cooldown, TIMEOUT))
-
- @strict = options.fetch(:strict, false)
- @queues = options.fetch(:queues).map { |q| QueueName.expand q }
-
- raise ArgumentError, "empty :queues" if @queues.empty?
-
- @queues.uniq! if @strict
- end
-
- # Retrieves job from redis.
- #
- # @return [Sidekiq::Throttled::UnitOfWork, nil]
- def retrieve_work
- work = brpop
- return unless work
-
- work = UnitOfWork.new(*work)
- return work unless work.throttled?
-
- work.requeue_throttled
- @paused << QueueName.expand(work.queue_name)
-
- nil
- end
-
- def bulk_requeue(units, _options)
- return if units.empty?
-
- Sidekiq.logger.debug { "Re-queueing terminated jobs" }
- Sidekiq.redis do |conn|
- conn.pipelined do |pipeline|
- units.each { |unit| unit.requeue(pipeline) }
- end
- end
- Sidekiq.logger.info("Pushed #{units.size} jobs back to Redis")
- rescue => e
- Sidekiq.logger.warn("Failed to requeue #{units.size} jobs: #{e}")
- end
-
- private
-
- # Tries to pop pair of `queue` and job `message` out of sidekiq queues.
- #
- # @see http://redis.io/commands/brpop
- # @return [Array(String, String), nil]
- def brpop
- queues = filter_queues(@strict ? @queues : @queues.shuffle.uniq)
-
- if queues.empty?
- sleep TIMEOUT
- return
- end
-
- Sidekiq.redis { |conn| conn.brpop(*queues, :timeout => TIMEOUT) }
- end
-
- # Returns list of queues to try to fetch jobs from.
- #
- # @note It may return an empty array.
- # @param [Array<String>] queues
- # @return [Array<String>]
- def filter_queues(queues)
- QueuesPauser.instance.filter(queues) - @paused.to_a
- end
- end
+ # @deprecated Use Sidekiq::Throttled::BasicFetch
+ Fetch = BasicFetch
end
end