lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.3.1 vs lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.3.2

- old
+ new

@@ -1,10 +1,7 @@ # frozen_string_literal: true -# stdlib -require "thread" -# 3rd party require "celluloid" if Sidekiq::VERSION < "4.0.0" require "sidekiq" require "sidekiq/fetch" module Sidekiq @@ -15,16 +12,14 @@ class UnitOfWork < ::Sidekiq::BasicFetch::UnitOfWork alias job message if Sidekiq::VERSION < "4.0.0" end - # Class constructor - def initialize(*args) - @mutex = Mutex.new - @suspended = [] - - super(*args) + def initialize(options) + @strictly_ordered_queues = (options[:strict] ? true : false) + @queues = options[:queues].map { |q| "queue:#{q}" } + @queues.uniq! if @strictly_ordered_queues end # @return [Sidekiq::BasicFetch::UnitOfWork, nil] def retrieve_work work = brpop @@ -33,37 +28,20 @@ work = UnitOfWork.new(*work) return work unless Throttled.throttled? work.job queue = "queue:#{work.queue_name}" - @mutex.synchronize { @suspended << queue } Sidekiq.redis { |conn| conn.lpush(queue, work.job) } nil end private # Tries to pop pair of `queue` and job `message` out of sidekiq queue. # @return [Array<String, String>, nil] def brpop - queues = if @strictly_ordered_queues - @unique_queues.dup - else - @queues.shuffle.uniq - end - - @mutex.synchronize do - next if @suspended.empty? - queues -= @suspended - @suspended.clear - end - - if queues.empty? - sleep TIMEOUT - return - end - + queues = (@strictly_ordered_queues ? @queues : @queues.shuffle.uniq) Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) } end end end end