lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.1

- old
+ new

@@ -9,11 +9,15 @@ module Sidekiq module Throttled # Meta-strategy that couples {Concurrency} and {Threshold} strategies. # # @private - class Strategy + class Strategy # rubocop:disable Metrics/ClassLength + # :enqueue means put the job back at the end of the queue immediately + # :schedule means schedule enqueueing the job for a later time when we expect to have capacity + VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze + # @!attribute [r] concurrency # @return [Strategy::Concurrency, nil] attr_reader :concurrency # @!attribute [r] threshold @@ -22,18 +26,23 @@ # @!attribute [r] observer # @return [Proc, nil] attr_reader :observer + # @!attribute [r] requeue_options + # @return [Hash, nil] + attr_reader :requeue_options + # @param [#to_s] name # @param [Hash] concurrency Concurrency options. # See keyword args of {Strategy::Concurrency#initialize} for details. # @param [Hash] threshold Threshold options. # See keyword args of {Strategy::Threshold#initialize} for details. # @param [#call] key_suffix Dynamic key suffix generator. # @param [#call] observer Process called after throttled. - def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) + # @param [#call] requeue What to do with jobs that are throttled. + def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) # rubocop:disable Metrics/MethodLength, Metrics/ParameterLists @observer = observer @concurrency = StrategyCollection.new(concurrency, strategy: Concurrency, name: name, @@ -42,11 +51,13 @@ @threshold = StrategyCollection.new(threshold, strategy: Threshold, name: name, key_suffix: key_suffix) - raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any? + @requeue_options = Throttled.config.default_requeue_options.merge(requeue || {}) + + validate! end # @return [Boolean] whenever strategy has dynamic config def dynamic? return true if @concurrency&.dynamic? @@ -70,30 +81,34 @@ end false end - # Return throttled job to be executed later. Implementation depends on the value of `with`: - # :enqueue means put the job back at the end of the queue immediately - # :schedule means schedule enqueueing the job for a later time when we expect to have capacity - # - # @param [#to_s, #call] with How to handle the throttled job - # @param [#to_s, #call] to Name of the queue to re-queue the job to. - # If not specified, will use the job's original queue. + # @return [Proc, Symbol] How to requeue the throttled job + def requeue_with + requeue_options[:with] + end + + # @return [String, nil] Name of the queue to re-queue the job to. + def requeue_to + requeue_options[:to] + end + + # Return throttled job to be executed later. Implementation depends on the strategy's `requeue` options. # @return [void] - def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength - # Resolve :with and :to arguments, calling them if they are Procs + def requeue_throttled(work) # rubocop:disable Metrics/MethodLength + # Resolve :with and :to options, calling them if they are Procs job_args = JSON.parse(work.job)["args"] - requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with - target_queue = calc_target_queue(work, to) + with = requeue_with.respond_to?(:call) ? requeue_with.call(*job_args) : requeue_with + target_queue = calc_target_queue(work) - case requeue_with + case with when :enqueue re_enqueue_throttled(work, target_queue) when :schedule # Find out when we will next be able to execute this job, and reschedule for then. - reschedule_throttled(work, requeue_to: target_queue) + reschedule_throttled(work, target_queue) else raise "unrecognized :with option #{with}" end end @@ -110,52 +125,61 @@ @threshold&.reset! end private - def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength - target = case to + def validate! + unless VALID_VALUES_FOR_REQUEUE_WITH.include?(@requeue_options[:with]) || + @requeue_options[:with].respond_to?(:call) + raise ArgumentError, "requeue: #{@requeue_options[:with]} is not a valid value for :with" + end + + raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any? + end + + def calc_target_queue(work) # rubocop:disable Metrics/MethodLength + target = case requeue_to when Proc, Method - to.call(*JSON.parse(work.job)["args"]) + requeue_to.call(*JSON.parse(work.job)["args"]) when NilClass work.queue when String, Symbol - to.to_s + requeue_to.to_s else raise ArgumentError, "Invalid argument for `to`" end target = work.queue if target.nil? || target.empty? target.start_with?("queue:") ? target : "queue:#{target}" end # Push the job back to the head of the queue. - def re_enqueue_throttled(work, requeue_to) + def re_enqueue_throttled(work, target_queue) case work.class.name when "Sidekiq::Pro::SuperFetch::UnitOfWork" # Calls SuperFetch UnitOfWork's requeue to remove the job from the # temporary queue and push job back to the head of the target queue, so that # the job won't be tried immediately after it was requeued (in most cases). - work.queue = requeue_to if requeue_to + work.queue = target_queue if target_queue work.requeue else # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. - Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) } + Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) } end end - def reschedule_throttled(work, requeue_to:) + def reschedule_throttled(work, target_queue) message = JSON.parse(work.job) job_class = message.fetch("wrapped") { message.fetch("class") { return false } } job_args = message["args"] # Re-enqueue the job to the target queue at another time as a NEW unit of work # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned # Technically, the job could processed twice if the process dies between the two lines, # but your job should be idempotent anyway, right? # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk. - Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args) + Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args) work.acknowledge end def retry_in(work) message = JSON.parse(work.job)