Sha256: 709191d2ad0e76ff4bed94b6e3df26dd6e8cd4f0d054bccefa8d9bca0551df0b

Contents?: true

Size: 1.43 KB

Versions: 1

Compression:

Stored size: 1.43 KB

Contents

# stdlib
require "thread"

# 3rd party
require "celluloid"
require "sidekiq"
require "sidekiq/fetch"

module Sidekiq
  module Throttled
    # Throttled version of `Sidekiq::BasicFetch` fetcher strategy.
    class BasicFetch < ::Sidekiq::BasicFetch
      # Class constructor
      def initialize(*args)
        @mutex      = Mutex.new
        @suspended  = []

        super(*args)
      end

      # @return [Sidekiq::BasicFetch::UnitOfWork, nil]
      def retrieve_work
        work = brpop
        return unless work

        work = ::Sidekiq::BasicFetch::UnitOfWork.new(*work)
        return work unless Throttled.throttled? work.message

        queue = "queue:#{work.queue_name}"

        @mutex.synchronize { @suspended << queue }
        Sidekiq.redis { |conn| conn.lpush(queue, work.message) }

        nil
      end

      private

      # Tries to pop pair of `queue` and job `message` out of sidekiq queue.
      # @return [Array<String, String>, nil]
      def brpop
        if @strictly_ordered_queues
          queues = @unique_queues.dup
        else
          queues = @queues.shuffle.uniq
        end

        @mutex.synchronize do
          next if @suspended.empty?
          queues -= @suspended
          @suspended.clear
        end

        if queues.empty?
          sleep Sidekiq::Fetcher::TIMEOUT
          return
        end

        Sidekiq.redis { |conn| conn.brpop(*queues, Sidekiq::Fetcher::TIMEOUT) }
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-throttled-0.1.0 lib/sidekiq/throttled/basic_fetch.rb