lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.6 vs lib/sidekiq/throttled/fetch.rb in sidekiq-throttled-0.6.7
- old
+ new
@@ -1,23 +1,27 @@
# frozen_string_literal: true
require "sidekiq"
+require "sidekiq/throttled/expirable_list"
require "sidekiq/throttled/fetch/unit_of_work"
require "sidekiq/throttled/queues_pauser"
require "sidekiq/throttled/queue_name"
module Sidekiq
module Throttled
# Throttled fetch strategy.
#
# @private
class Fetch
+ # Timeout to sleep between fetch retries in case of no job received,
+ # as well as timeout to wait for redis to give us something to work.
TIMEOUT = 2
- private_constant :TIMEOUT
# Initializes fetcher instance.
def initialize(options)
+ @paused = ExpirableList.new(TIMEOUT)
+
@strict = options[:strict]
@queues = options[:queues].map { |q| QueueName.expand q }
@queues.uniq! if @strict
end
@@ -31,11 +35,11 @@
work = UnitOfWork.new(*work)
return work unless work.throttled?
work.requeue_throttled
- sleep TIMEOUT
+ @paused << QueueName.expand(work.queue_name)
nil
end
class << self
@@ -76,10 +80,10 @@
#
# @note It may return an empty array.
# @param [Array<String>] queues
# @return [Array<String>]
def filter_queues(queues)
- QueuesPauser.instance.filter(queues)
+ QueuesPauser.instance.filter(queues) - @paused.to_a
end
end
end
end