lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.4.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.0
- old
+ new
@@ -29,11 +29,11 @@
# See keyword args of {Strategy::Concurrency#initialize} for details.
# @param [Hash] threshold Threshold options.
# See keyword args of {Strategy::Threshold#initialize} for details.
# @param [#call] key_suffix Dynamic key suffix generator.
# @param [#call] observer Process called after throttled.
- def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) # rubocop:disable Metrics/MethodLength
+ def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil)
@observer = observer
@concurrency = StrategyCollection.new(concurrency,
strategy: Concurrency,
name: name,
@@ -42,13 +42,11 @@
@threshold = StrategyCollection.new(threshold,
strategy: Threshold,
name: name,
key_suffix: key_suffix)
- return if @concurrency.any? || @threshold.any?
-
- raise ArgumentError, "Neither :concurrency nor :threshold given"
+ raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any?
end
# @return [Boolean] whenever strategy has dynamic config
def dynamic?
return true if @concurrency&.dynamic?
@@ -72,20 +70,113 @@
end
false
end
+ # Return throttled job to be executed later. Implementation depends on the value of `with`:
+ # :enqueue means put the job back at the end of the queue immediately
+ # :schedule means schedule enqueueing the job for a later time when we expect to have capacity
+ #
+ # @param [#to_s, #call] with How to handle the throttled job
+ # @param [#to_s, #call] to Name of the queue to re-queue the job to.
+ # If not specified, will use the job's original queue.
+ # @return [void]
+ def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength
+ # Resolve :with and :to arguments, calling them if they are Procs
+ job_args = JSON.parse(work.job)["args"]
+ requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with
+ target_queue = calc_target_queue(work, to)
+
+ case requeue_with
+ when :enqueue
+ re_enqueue_throttled(work, target_queue)
+ when :schedule
+ # Find out when we will next be able to execute this job, and reschedule for then.
+ reschedule_throttled(work, requeue_to: target_queue)
+ else
+ raise "unrecognized :with option #{with}"
+ end
+ end
+
# Marks job as being processed.
# @return [void]
def finalize!(jid, *job_args)
@concurrency&.finalize!(jid, *job_args)
end
- # Resets count of jobs of all avaliable strategies
+ # Resets count of jobs of all available strategies
# @return [void]
def reset!
@concurrency&.reset!
@threshold&.reset!
+ end
+
+ private
+
+ def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength
+ target = case to
+ when Proc, Method
+ to.call(*JSON.parse(work.job)["args"])
+ when NilClass
+ work.queue
+ when String, Symbol
+ to.to_s
+ else
+ raise ArgumentError, "Invalid argument for `to`"
+ end
+
+ target = work.queue if target.nil? || target.empty?
+
+ target.start_with?("queue:") ? target : "queue:#{target}"
+ end
+
+ # Push the job back to the head of the queue.
+ def re_enqueue_throttled(work, requeue_to)
+ case work.class.name
+ when "Sidekiq::Pro::SuperFetch::UnitOfWork"
+ # Calls SuperFetch UnitOfWork's requeue to remove the job from the
+ # temporary queue and push job back to the head of the target queue, so that
+ # the job won't be tried immediately after it was requeued (in most cases).
+ work.queue = requeue_to if requeue_to
+ work.requeue
+ else
+ # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
+ Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) }
+ end
+ end
+
+ def reschedule_throttled(work, requeue_to:)
+ message = JSON.parse(work.job)
+ job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
+ job_args = message["args"]
+
+ # Re-enqueue the job to the target queue at another time as a NEW unit of work
+ # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned
+ # Technically, the job could processed twice if the process dies between the two lines,
+ # but your job should be idempotent anyway, right?
+ # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk.
+ Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args)
+ work.acknowledge
+ end
+
+ def retry_in(work)
+ message = JSON.parse(work.job)
+ jid = message.fetch("jid") { return false }
+ job_args = message["args"]
+
+ # Ask both concurrency and threshold, if relevant, how long minimum until we can retry.
+ # If we get two answers, take the longer one.
+ intervals = [@concurrency&.retry_in(jid, *job_args), @threshold&.retry_in(*job_args)].compact
+
+ raise "Cannot compute a valid retry interval" if intervals.empty?
+
+ interval = intervals.max
+
+ # Add a random amount of jitter, proportional to the length of the minimum retry time.
+ # This helps spread out jobs more evenly and avoid clumps of jobs on the queue.
+ interval += rand(interval / 5) if interval > 10
+
+ interval
end
end
end
end