Sha256: 307d49fc3067fcd24e8110bf4eb0387967c575fb135448a5e02c06e445feef85

Contents?: true

Size: 1.48 KB

Versions: 1

Compression:

Stored size: 1.48 KB

Contents

# frozen_string_literal: true
# stdlib
require "thread"

# 3rd party
require "celluloid"
require "sidekiq"
require "sidekiq/fetch"

module Sidekiq
  module Throttled
    # Throttled version of `Sidekiq::BasicFetch` fetcher strategy.
    class BasicFetch < ::Sidekiq::BasicFetch
      # Class constructor
      def initialize(*args)
        @mutex      = Mutex.new
        @suspended  = []

        super(*args)
      end

      # @return [Sidekiq::BasicFetch::UnitOfWork, nil]
      def retrieve_work
        work = brpop
        return unless work

        work = ::Sidekiq::BasicFetch::UnitOfWork.new(*work)
        return work unless Throttled.throttled? work.message

        queue = "queue:#{work.queue_name}"

        @mutex.synchronize { @suspended << queue }
        Sidekiq.redis { |conn| conn.lpush(queue, work.message) }

        nil
      end

      private

      # Tries to pop pair of `queue` and job `message` out of sidekiq queue.
      # @return [Array<String, String>, nil]
      def brpop
        queues = if @strictly_ordered_queues
                   @unique_queues.dup
                 else
                   @queues.shuffle.uniq
                 end

        @mutex.synchronize do
          next if @suspended.empty?
          queues -= @suspended
          @suspended.clear
        end

        if queues.empty?
          sleep Sidekiq::Fetcher::TIMEOUT
          return
        end

        Sidekiq.redis { |conn| conn.brpop(*queues, Sidekiq::Fetcher::TIMEOUT) }
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-throttled-0.2.0 lib/sidekiq/throttled/basic_fetch.rb