Sha256: 6c6e416c54f0d9031b79773a9eba75aac7721f59ac6788977d4d54a45c3a5021
Contents?: true
Size: 1.3 KB
Versions: 7
Compression:
Stored size: 1.3 KB
Contents
require 'sidekiq' require 'sidekiq/util' require 'sidekiq/api' require 'forwardable' class Sidekiq::LimitFetch autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work' require_relative 'limit_fetch/redis' require_relative 'limit_fetch/singleton' require_relative 'limit_fetch/queues' require_relative 'limit_fetch/global/semaphore' require_relative 'limit_fetch/global/selector' require_relative 'limit_fetch/global/monitor' require_relative 'extensions/queue' include Redis Sidekiq.options[:fetch] = self def self.bulk_requeue(*args) Sidekiq::BasicFetch.bulk_requeue *args end def initialize(options) Global::Monitor.start! @queues = Queues.new options.merge(namespace: determine_namespace) end def retrieve_work queue, message = fetch_message UnitOfWork.new queue, message if message end private def fetch_message queue, _ = redis_brpop *@queues.acquire, Sidekiq::Fetcher::TIMEOUT ensure @queues.release_except queue end def redis_brpop(*args) return if args.size < 2 query = -> redis { redis.brpop *args } if busy_local_queues.any? {|queue| not args.include? queue.rname } nonblocking_redis(&query) else redis(&query) end end def busy_local_queues Sidekiq::Queue.instances.select(&:local_busy?) end end
Version data entries
7 entries across 7 versions & 1 rubygems