lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.1
- old
+ new
@@ -9,11 +9,15 @@
module Sidekiq
module Throttled
# Meta-strategy that couples {Concurrency} and {Threshold} strategies.
#
# @private
- class Strategy
+ class Strategy # rubocop:disable Metrics/ClassLength
+ # :enqueue means put the job back at the end of the queue immediately
+ # :schedule means schedule enqueueing the job for a later time when we expect to have capacity
+ VALID_VALUES_FOR_REQUEUE_WITH = %i[enqueue schedule].freeze
+
# @!attribute [r] concurrency
# @return [Strategy::Concurrency, nil]
attr_reader :concurrency
# @!attribute [r] threshold
@@ -22,18 +26,23 @@
# @!attribute [r] observer
# @return [Proc, nil]
attr_reader :observer
+ # @!attribute [r] requeue_options
+ # @return [Hash, nil]
+ attr_reader :requeue_options
+
# @param [#to_s] name
# @param [Hash] concurrency Concurrency options.
# See keyword args of {Strategy::Concurrency#initialize} for details.
# @param [Hash] threshold Threshold options.
# See keyword args of {Strategy::Threshold#initialize} for details.
# @param [#call] key_suffix Dynamic key suffix generator.
# @param [#call] observer Process called after throttled.
- def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil)
+ # @param [#call] requeue What to do with jobs that are throttled.
+ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil, requeue: nil) # rubocop:disable Metrics/MethodLength, Metrics/ParameterLists
@observer = observer
@concurrency = StrategyCollection.new(concurrency,
strategy: Concurrency,
name: name,
@@ -42,11 +51,13 @@
@threshold = StrategyCollection.new(threshold,
strategy: Threshold,
name: name,
key_suffix: key_suffix)
- raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
+ @requeue_options = Throttled.config.default_requeue_options.merge(requeue || {})
+
+ validate!
end
# @return [Boolean] whenever strategy has dynamic config
def dynamic?
return true if @concurrency&.dynamic?
@@ -70,30 +81,34 @@
end
false
end
- # Return throttled job to be executed later. Implementation depends on the value of `with`:
- # :enqueue means put the job back at the end of the queue immediately
- # :schedule means schedule enqueueing the job for a later time when we expect to have capacity
- #
- # @param [#to_s, #call] with How to handle the throttled job
- # @param [#to_s, #call] to Name of the queue to re-queue the job to.
- # If not specified, will use the job's original queue.
+ # @return [Proc, Symbol] How to requeue the throttled job
+ def requeue_with
+ requeue_options[:with]
+ end
+
+ # @return [String, nil] Name of the queue to re-queue the job to.
+ def requeue_to
+ requeue_options[:to]
+ end
+
+ # Return throttled job to be executed later. Implementation depends on the strategy's `requeue` options.
# @return [void]
- def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength
- # Resolve :with and :to arguments, calling them if they are Procs
+ def requeue_throttled(work) # rubocop:disable Metrics/MethodLength
+ # Resolve :with and :to options, calling them if they are Procs
job_args = JSON.parse(work.job)["args"]
- requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with
- target_queue = calc_target_queue(work, to)
+ with = requeue_with.respond_to?(:call) ? requeue_with.call(*job_args) : requeue_with
+ target_queue = calc_target_queue(work)
- case requeue_with
+ case with
when :enqueue
re_enqueue_throttled(work, target_queue)
when :schedule
# Find out when we will next be able to execute this job, and reschedule for then.
- reschedule_throttled(work, requeue_to: target_queue)
+ reschedule_throttled(work, target_queue)
else
raise "unrecognized :with option #{with}"
end
end
@@ -110,52 +125,61 @@
@threshold&.reset!
end
private
- def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength
- target = case to
+ def validate!
+ unless VALID_VALUES_FOR_REQUEUE_WITH.include?(@requeue_options[:with]) ||
+ @requeue_options[:with].respond_to?(:call)
+ raise ArgumentError, "requeue: #{@requeue_options[:with]} is not a valid value for :with"
+ end
+
+ raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
+ end
+
+ def calc_target_queue(work) # rubocop:disable Metrics/MethodLength
+ target = case requeue_to
when Proc, Method
- to.call(*JSON.parse(work.job)["args"])
+ requeue_to.call(*JSON.parse(work.job)["args"])
when NilClass
work.queue
when String, Symbol
- to.to_s
+ requeue_to.to_s
else
raise ArgumentError, "Invalid argument for `to`"
end
target = work.queue if target.nil? || target.empty?
target.start_with?("queue:") ? target : "queue:#{target}"
end
# Push the job back to the head of the queue.
- def re_enqueue_throttled(work, requeue_to)
+ def re_enqueue_throttled(work, target_queue)
case work.class.name
when "Sidekiq::Pro::SuperFetch::UnitOfWork"
# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the target queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
- work.queue = requeue_to if requeue_to
+ work.queue = target_queue if target_queue
work.requeue
else
# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
- Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) }
+ Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) }
end
end
- def reschedule_throttled(work, requeue_to:)
+ def reschedule_throttled(work, target_queue)
message = JSON.parse(work.job)
job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
job_args = message["args"]
# Re-enqueue the job to the target queue at another time as a NEW unit of work
# AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned
# Technically, the job could processed twice if the process dies between the two lines,
# but your job should be idempotent anyway, right?
# The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk.
- Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args)
+ Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args)
work.acknowledge
end
def retry_in(work)
message = JSON.parse(work.job)