Sha256: 709191d2ad0e76ff4bed94b6e3df26dd6e8cd4f0d054bccefa8d9bca0551df0b
Contents?: true
Size: 1.43 KB
Versions: 1
Compression:
Stored size: 1.43 KB
Contents
# stdlib require "thread" # 3rd party require "celluloid" require "sidekiq" require "sidekiq/fetch" module Sidekiq module Throttled # Throttled version of `Sidekiq::BasicFetch` fetcher strategy. class BasicFetch < ::Sidekiq::BasicFetch # Class constructor def initialize(*args) @mutex = Mutex.new @suspended = [] super(*args) end # @return [Sidekiq::BasicFetch::UnitOfWork, nil] def retrieve_work work = brpop return unless work work = ::Sidekiq::BasicFetch::UnitOfWork.new(*work) return work unless Throttled.throttled? work.message queue = "queue:#{work.queue_name}" @mutex.synchronize { @suspended << queue } Sidekiq.redis { |conn| conn.lpush(queue, work.message) } nil end private # Tries to pop pair of `queue` and job `message` out of sidekiq queue. # @return [Array<String, String>, nil] def brpop if @strictly_ordered_queues queues = @unique_queues.dup else queues = @queues.shuffle.uniq end @mutex.synchronize do next if @suspended.empty? queues -= @suspended @suspended.clear end if queues.empty? sleep Sidekiq::Fetcher::TIMEOUT return end Sidekiq.redis { |conn| conn.brpop(*queues, Sidekiq::Fetcher::TIMEOUT) } end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-throttled-0.1.0 | lib/sidekiq/throttled/basic_fetch.rb |