Sha256: 092a14bec9367e255ce0ec8907c191aa624e2015cab59e85e0edb0ded2cd2cf7

Contents?: true

Size: 1.59 KB

Versions: 1

Compression:

Stored size: 1.59 KB

Contents

# frozen_string_literal: true

require "sidekiq"
require "sidekiq/fetch"

module Sidekiq
  module Throttled
    module Patches
      module BasicFetch
        class << self
          def apply!
            Sidekiq::BasicFetch.prepend(self) unless Sidekiq::BasicFetch.include?(self)
          end
        end

        # Retrieves job from redis.
        #
        # @return [Sidekiq::Throttled::UnitOfWork, nil]
        def retrieve_work
          work = super

          if work && Throttled.throttled?(work.job)
            requeue_throttled(work)
            return nil
          end

          work
        end

        private

        # Pushes job back to the head of the queue, so that job won't be tried
        # immediately after it was requeued (in most cases).
        #
        # @note This is triggered when job is throttled. So it is same operation
        #   Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
        #
        # @return [void]
        def requeue_throttled(work)
          redis { |conn| conn.lpush(work.queue, work.job) }
        end

        # Returns list of queues to try to fetch jobs from.
        #
        # @note It may return an empty array.
        # @param [Array<String>] queues
        # @return [Array<String>]
        def queues_cmd
          queues = super

          # TODO: Refactor to be prepended as an integration mixin during configuration stage
          #   Or via configurable queues reducer
          queues -= Sidekiq::Pauzer.paused_queues.map { |name| "queue:#{name}" } if defined?(Sidekiq::Pauzer)

          queues
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
sidekiq-throttled-1.0.0.alpha.1 lib/sidekiq/throttled/patches/basic_fetch.rb