Sha256: 6e65a540930ef4619dbd85b9292f4527c91f221d31a14a067ede7498aeea6791

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

# frozen_string_literal: true

require "sidekiq"
require "sidekiq/throttled/unit_of_work"
require "sidekiq/throttled/queues_pauser"
require "sidekiq/throttled/queue_name"

module Sidekiq
  module Throttled
    # Throttled fetch strategy.
    #
    # @private
    class Fetch
      TIMEOUT = 2
      private_constant :TIMEOUT

      # Initializes fetcher instance.
      def initialize(options)
        @strict = options[:strict]
        @queues = options[:queues].map { |q| QueueName.expand q }

        @queues.uniq! if @strict
      end

      # @return [Sidekiq::Throttled::UnitOfWork, nil]
      def retrieve_work
        work = brpop
        return unless work

        work = UnitOfWork.new(*work)
        return work unless Throttled.throttled? work.job

        Sidekiq.redis do |conn|
          conn.lpush(QueueName.expand(work.queue_name), work.job)
        end

        nil
      end

      private

      # Tries to pop pair of `queue` and job `message` out of sidekiq queue.
      #
      # @see http://redis.io/commands/brpop
      # @return [Array(String, String), nil]
      def brpop
        queues = build_queues_list

        if queues.empty?
          sleep TIMEOUT
          return
        end

        Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) }
      end

      # Returns list of queues to try to fetch jobs from.
      #
      # @note It may return an empty array.
      # @return [Array<String>]
      def build_queues_list
        QueuesPauser.instance.filter(@strict ? @queues : @queues.shuffle.uniq)
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-throttled-0.6.1 lib/sidekiq/throttled/fetch.rb