lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.3.1 vs lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.3.2
- old
+ new
@@ -1,10 +1,7 @@
# frozen_string_literal: true
-# stdlib
-require "thread"
-# 3rd party
require "celluloid" if Sidekiq::VERSION < "4.0.0"
require "sidekiq"
require "sidekiq/fetch"
module Sidekiq
@@ -15,16 +12,14 @@
class UnitOfWork < ::Sidekiq::BasicFetch::UnitOfWork
alias job message if Sidekiq::VERSION < "4.0.0"
end
- # Class constructor
- def initialize(*args)
- @mutex = Mutex.new
- @suspended = []
-
- super(*args)
+ def initialize(options)
+ @strictly_ordered_queues = (options[:strict] ? true : false)
+ @queues = options[:queues].map { |q| "queue:#{q}" }
+ @queues.uniq! if @strictly_ordered_queues
end
# @return [Sidekiq::BasicFetch::UnitOfWork, nil]
def retrieve_work
work = brpop
@@ -33,37 +28,20 @@
work = UnitOfWork.new(*work)
return work unless Throttled.throttled? work.job
queue = "queue:#{work.queue_name}"
- @mutex.synchronize { @suspended << queue }
Sidekiq.redis { |conn| conn.lpush(queue, work.job) }
nil
end
private
# Tries to pop pair of `queue` and job `message` out of sidekiq queue.
# @return [Array<String, String>, nil]
def brpop
- queues = if @strictly_ordered_queues
- @unique_queues.dup
- else
- @queues.shuffle.uniq
- end
-
- @mutex.synchronize do
- next if @suspended.empty?
- queues -= @suspended
- @suspended.clear
- end
-
- if queues.empty?
- sleep TIMEOUT
- return
- end
-
+ queues = (@strictly_ordered_queues ? @queues : @queues.shuffle.uniq)
Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) }
end
end
end
end