lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.2.0 vs lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.3.0.pre

- old
+ new

@@ -1,18 +1,24 @@ # frozen_string_literal: true # stdlib require "thread" # 3rd party -require "celluloid" +require "celluloid" if Sidekiq::VERSION < "4.0.0" require "sidekiq" require "sidekiq/fetch" module Sidekiq module Throttled # Throttled version of `Sidekiq::BasicFetch` fetcher strategy. class BasicFetch < ::Sidekiq::BasicFetch + TIMEOUT = 2 + + class UnitOfWork < ::Sidekiq::BasicFetch::UnitOfWork + alias job message if Sidekiq::VERSION < "4.0.0" + end + # Class constructor def initialize(*args) @mutex = Mutex.new @suspended = [] @@ -22,17 +28,17 @@ # @return [Sidekiq::BasicFetch::UnitOfWork, nil] def retrieve_work work = brpop return unless work - work = ::Sidekiq::BasicFetch::UnitOfWork.new(*work) - return work unless Throttled.throttled? work.message + work = UnitOfWork.new(*work) + return work unless Throttled.throttled? work.job queue = "queue:#{work.queue_name}" @mutex.synchronize { @suspended << queue } - Sidekiq.redis { |conn| conn.lpush(queue, work.message) } + Sidekiq.redis { |conn| conn.lpush(queue, work.job) } nil end private @@ -51,14 +57,14 @@ queues -= @suspended @suspended.clear end if queues.empty? - sleep Sidekiq::Fetcher::TIMEOUT + sleep TIMEOUT return end - Sidekiq.redis { |conn| conn.brpop(*queues, Sidekiq::Fetcher::TIMEOUT) } + Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) } end end end end