Sha256: fb4e867faad421532c23180309c10089ba3651b1e42945bbab6c2a1363788ed7
Contents?: true
Size: 1.09 KB
Versions: 1
Compression:
Stored size: 1.09 KB
Contents
# frozen_string_literal: true require "sidekiq" require "sidekiq/throttled/unit_of_work" module Sidekiq module Throttled # Throttled fetch strategy. class Fetch TIMEOUT = 2 private_constant :TIMEOUT def initialize(options) @strictly_ordered_queues = options[:strict] @queues = options[:queues].map { |q| "queue:#{q}" } @queues.uniq! if @strictly_ordered_queues end # @return [Sidekiq::Throttled::UnitOfWork, nil] def retrieve_work work = brpop return unless work work = UnitOfWork.new(*work) return work unless Throttled.throttled? work.job Sidekiq.redis do |conn| conn.lpush("queue:#{work.queue_name}", work.job) end nil end private # Tries to pop pair of `queue` and job `message` out of sidekiq queue. # @return [Array<String, String>, nil] def brpop Sidekiq.redis { |conn| conn.brpop(*queues, TIMEOUT) } end def queues (@strictly_ordered_queues ? @queues : @queues.shuffle.uniq) end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
sidekiq-throttled-0.5.0 | lib/sidekiq/throttled/fetch.rb |