Sha256: 4ee65d210af7df29612bffc323654fdf4c0a3b8eba6a120c9a21a022aed4d0cc

Contents?: true

Size: 1.9 KB

Versions: 2

Compression:

Stored size: 1.9 KB

Contents

require 'forwardable'
require 'sidekiq'
require 'sidekiq/manager'
require 'sidekiq/api'

module Sidekiq::LimitFetch
  autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work'

  require_relative 'limit_fetch/instances'
  require_relative 'limit_fetch/queues'
  require_relative 'limit_fetch/global/semaphore'
  require_relative 'limit_fetch/global/selector'
  require_relative 'limit_fetch/global/monitor'
  require_relative 'extensions/queue'
  require_relative 'extensions/manager'

  TIMEOUT = Sidekiq::BasicFetch::TIMEOUT

  extend self

  def new(_)
    self
  end

  def retrieve_work
    queue, job = redis_brpop(Queues.acquire)
    Queues.release_except(queue)
    UnitOfWork.new(queue, job) if job
  end

  def config
    # Post 6.5, Sidekiq.options is deprecated and replaced with passing Sidekiq directly
    post_6_5? ? Sidekiq : Sidekiq.options
  end

  # Backwards compatibility for sidekiq v6.1.0
  # @see https://github.com/mperham/sidekiq/pull/4602
  def bulk_requeue(*args)
    if Sidekiq::BasicFetch.respond_to?(:bulk_requeue) # < 6.1.0
      Sidekiq::BasicFetch.bulk_requeue(*args)
    else # 6.1.0+
      Sidekiq::BasicFetch.new(config).bulk_requeue(*args)
    end
  end

  def redis_retryable
    yield
  rescue Redis::BaseConnectionError
    sleep TIMEOUT
    retry
  rescue Redis::CommandError => error
    # If Redis was restarted and is still loading its snapshot,
    # then we should treat this as a temporary connection error too.
    if error.message =~ /^LOADING/
      sleep TIMEOUT
      retry
    else
      raise
    end
  end

  private

  def post_6_5?
    @post_6_5 ||= Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new('6.5.0')
  end

  def redis_brpop(queues)
    if queues.empty?
      sleep TIMEOUT  # there are no queues to handle, so lets sleep
      []             # and return nothing
    else
      redis_retryable { Sidekiq.redis { |it| it.brpop *queues, timeout: TIMEOUT } }
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
sidekiq-limit_fetch-4.3.2 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-4.3.1 lib/sidekiq/limit_fetch.rb