Sha256: 6c6e416c54f0d9031b79773a9eba75aac7721f59ac6788977d4d54a45c3a5021

Contents?: true

Size: 1.3 KB

Versions: 7

Compression:

Stored size: 1.3 KB

Contents

require 'sidekiq'
require 'sidekiq/util'
require 'sidekiq/api'
require 'forwardable'

class Sidekiq::LimitFetch
  autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work'

  require_relative 'limit_fetch/redis'
  require_relative 'limit_fetch/singleton'
  require_relative 'limit_fetch/queues'
  require_relative 'limit_fetch/global/semaphore'
  require_relative 'limit_fetch/global/selector'
  require_relative 'limit_fetch/global/monitor'
  require_relative 'extensions/queue'

  include Redis
  Sidekiq.options[:fetch] = self

  def self.bulk_requeue(*args)
    Sidekiq::BasicFetch.bulk_requeue *args
  end

  def initialize(options)
    Global::Monitor.start!
    @queues = Queues.new options.merge(namespace: determine_namespace)
  end

  def retrieve_work
    queue, message = fetch_message
    UnitOfWork.new queue, message if message
  end

  private

  def fetch_message
    queue, _ = redis_brpop *@queues.acquire, Sidekiq::Fetcher::TIMEOUT
  ensure
    @queues.release_except queue
  end

  def redis_brpop(*args)
    return if args.size < 2
    query = -> redis { redis.brpop *args }

    if busy_local_queues.any? {|queue| not args.include? queue.rname }
      nonblocking_redis(&query)
    else
      redis(&query)
    end
  end

  def busy_local_queues
    Sidekiq::Queue.instances.select(&:local_busy?)
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
sidekiq-limit_fetch-2.2.7 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-2.2.6 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-2.2.5 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-2.2.4 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-2.2.3 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-2.2.2 lib/sidekiq/limit_fetch.rb
sidekiq-limit_fetch-2.2.1 lib/sidekiq/limit_fetch.rb