Sha256: 4ee65d210af7df29612bffc323654fdf4c0a3b8eba6a120c9a21a022aed4d0cc
Contents?: true
Size: 1.9 KB
Versions: 2
Compression:
Stored size: 1.9 KB
Contents
require 'forwardable' require 'sidekiq' require 'sidekiq/manager' require 'sidekiq/api' module Sidekiq::LimitFetch autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work' require_relative 'limit_fetch/instances' require_relative 'limit_fetch/queues' require_relative 'limit_fetch/global/semaphore' require_relative 'limit_fetch/global/selector' require_relative 'limit_fetch/global/monitor' require_relative 'extensions/queue' require_relative 'extensions/manager' TIMEOUT = Sidekiq::BasicFetch::TIMEOUT extend self def new(_) self end def retrieve_work queue, job = redis_brpop(Queues.acquire) Queues.release_except(queue) UnitOfWork.new(queue, job) if job end def config # Post 6.5, Sidekiq.options is deprecated and replaced with passing Sidekiq directly post_6_5? ? Sidekiq : Sidekiq.options end # Backwards compatibility for sidekiq v6.1.0 # @see https://github.com/mperham/sidekiq/pull/4602 def bulk_requeue(*args) if Sidekiq::BasicFetch.respond_to?(:bulk_requeue) # < 6.1.0 Sidekiq::BasicFetch.bulk_requeue(*args) else # 6.1.0+ Sidekiq::BasicFetch.new(config).bulk_requeue(*args) end end def redis_retryable yield rescue Redis::BaseConnectionError sleep TIMEOUT retry rescue Redis::CommandError => error # If Redis was restarted and is still loading its snapshot, # then we should treat this as a temporary connection error too. if error.message =~ /^LOADING/ sleep TIMEOUT retry else raise end end private def post_6_5? @post_6_5 ||= Gem::Version.new(Sidekiq::VERSION) >= Gem::Version.new('6.5.0') end def redis_brpop(queues) if queues.empty? sleep TIMEOUT # there are no queues to handle, so lets sleep [] # and return nothing else redis_retryable { Sidekiq.redis { |it| it.brpop *queues, timeout: TIMEOUT } } end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-limit_fetch-4.3.2 | lib/sidekiq/limit_fetch.rb |
sidekiq-limit_fetch-4.3.1 | lib/sidekiq/limit_fetch.rb |