Sha256: a92de68ff50a92933cca6e658c907afed3fe6915ee66040b6fdcf36a08c233bd

Contents?: true

Size: 1.38 KB

Versions: 2

Compression:

Stored size: 1.38 KB

Contents

require 'forwardable'
require 'sidekiq'
require 'sidekiq/manager'
require 'sidekiq/api'

module Sidekiq::LimitFetch
  autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work'

  require_relative 'limit_fetch/instances'
  require_relative 'limit_fetch/queues'
  require_relative 'limit_fetch/global/semaphore'
  require_relative 'limit_fetch/global/selector'
  require_relative 'limit_fetch/global/monitor'
  require_relative 'extensions/queue'
  require_relative 'extensions/manager'

  extend self

  def new(_)
    self
  end

  def retrieve_work
    queue, job = redis_brpop(Queues.acquire)
    Queues.release_except(queue)
    UnitOfWork.new(queue, job) if job
  end

  # Backwards compatibility for sidekiq v6.1.0
  # @see https://github.com/mperham/sidekiq/pull/4602
  def bulk_requeue(*args)
    if Sidekiq::BasicFetch.respond_to?(:bulk_requeue) # < 6.1.0
      Sidekiq::BasicFetch.bulk_requeue(*args)
    else # 6.1.0+
      Sidekiq::BasicFetch.new(Sidekiq.options).bulk_requeue(*args)
    end
  end

  def redis_retryable
    yield
  rescue Redis::BaseConnectionError
    sleep 1
    retry
  end

  private

  TIMEOUT = Sidekiq::BasicFetch::TIMEOUT

  def redis_brpop(queues)
    if queues.empty?
      sleep TIMEOUT  # there are no queues to handle, so lets sleep
      []             # and return nothing
    else
      redis_retryable { Sidekiq.redis { |it| it.brpop *queues, TIMEOUT } }
    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
sidekiq-limit_fetch-4.0.0 lib/sidekiq/limit_fetch.rb
rcgt-sidekiq-limit_fetch-3.4.1 lib/sidekiq/limit_fetch.rb