lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.2.0 vs lib/sidekiq/throttled/basic_fetch.rb in sidekiq-throttled-0.3.0.pre
- old
+ new
@@ -1,18 +1,24 @@
# frozen_string_literal: true
# stdlib
require "thread"
# 3rd party
-require "celluloid"
+require "celluloid" if Sidekiq::VERSION < "4.0.0"
require "sidekiq"
require "sidekiq/fetch"
module Sidekiq
module Throttled
# Throttled version of `Sidekiq::BasicFetch` fetcher strategy.
class BasicFetch < ::Sidekiq::BasicFetch
+ TIMEOUT = 2
+
+ class UnitOfWork < ::Sidekiq::BasicFetch::UnitOfWork
+ alias job message if Sidekiq::VERSION < "4.0.0"
+ end
+
# Class constructor
def initialize(*args)
@mutex = Mutex.new
@suspended = []
@@ -22,17 +28,17 @@
# @return [Sidekiq::BasicFetch::UnitOfWork, nil]
def retrieve_work
work = brpop
return unless work
- work = ::Sidekiq::BasicFetch::UnitOfWork.new(*work)
- return work unless Throttled.throttled? work.message
+ work = UnitOfWork.new(*work)
+ return work unless Throttled.throttled? work.job
queue = "queue:#{work.queue_name}"
@mutex.synchronize { @suspended << queue }
- Sidekiq.redis { |conn| conn.lpush(queue, work.message) }
+ Sidekiq.redis { |conn| conn.lpush(queue, work.job) }
nil
end
private
@@ -51,14 +57,14 @@
queues -= @suspended
@suspended.clear
end
if queues.empty?
- sleep Sidekiq::Fetcher::TIMEOUT
+ sleep TIMEOUT
return
end
- Sidekiq.redis { |conn| conn.brpop(*queues, Sidekiq::Fetcher::TIMEOUT) }
+ Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) }
end
end
end
end