Sha256: 899fadd9a20759637919534d3fece83130c8518533b1c62c5b5faad008bb967a

Contents?: true

Size: 1.28 KB

Versions: 3

Compression:

Stored size: 1.28 KB

Contents

# frozen_string_literal: true

require "celluloid" if Sidekiq::VERSION < "4.0.0"
require "sidekiq"
require "sidekiq/fetch"

module Sidekiq
  module Throttled
    # Throttled version of `Sidekiq::BasicFetch` fetcher strategy.
    class BasicFetch < ::Sidekiq::BasicFetch
      TIMEOUT = 2

      class UnitOfWork < ::Sidekiq::BasicFetch::UnitOfWork
        alias job message if Sidekiq::VERSION < "4.0.0"
      end

      def initialize(options)
        @strictly_ordered_queues = (options[:strict] ? true : false)
        @queues = options[:queues].map { |q| "queue:#{q}" }
        @queues.uniq! if @strictly_ordered_queues
      end

      # @return [Sidekiq::BasicFetch::UnitOfWork, nil]
      def retrieve_work
        work = brpop
        return unless work

        work = UnitOfWork.new(*work)
        return work unless Throttled.throttled? work.job

        queue = "queue:#{work.queue_name}"

        Sidekiq.redis { |conn| conn.lpush(queue, work.job) }

        nil
      end

      private

      # Tries to pop pair of `queue` and job `message` out of sidekiq queue.
      # @return [Array<String, String>, nil]
      def brpop
        queues = (@strictly_ordered_queues ? @queues : @queues.shuffle.uniq)
        Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) }
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
sidekiq-throttled-0.4.1 lib/sidekiq/throttled/basic_fetch.rb
sidekiq-throttled-0.4.0 lib/sidekiq/throttled/basic_fetch.rb
sidekiq-throttled-0.3.2 lib/sidekiq/throttled/basic_fetch.rb