lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.4.0 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.0

- old
+ new

@@ -29,11 +29,11 @@ # See keyword args of {Strategy::Concurrency#initialize} for details. # @param [Hash] threshold Threshold options. # See keyword args of {Strategy::Threshold#initialize} for details. # @param [#call] key_suffix Dynamic key suffix generator. # @param [#call] observer Process called after throttled. - def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) # rubocop:disable Metrics/MethodLength + def initialize(name, concurrency: nil, threshold: nil, key_suffix: nil, observer: nil) @observer = observer @concurrency = StrategyCollection.new(concurrency, strategy: Concurrency, name: name, @@ -42,13 +42,11 @@ @threshold = StrategyCollection.new(threshold, strategy: Threshold, name: name, key_suffix: key_suffix) - return if @concurrency.any? || @threshold.any? - - raise ArgumentError, "Neither :concurrency nor :threshold given" + raise ArgumentError, "Neither :concurrency nor :threshold given" unless @concurrency.any? || @threshold.any? end # @return [Boolean] whenever strategy has dynamic config def dynamic? return true if @concurrency&.dynamic? @@ -72,20 +70,113 @@ end false end + # Return throttled job to be executed later. Implementation depends on the value of `with`: + # :enqueue means put the job back at the end of the queue immediately + # :schedule means schedule enqueueing the job for a later time when we expect to have capacity + # + # @param [#to_s, #call] with How to handle the throttled job + # @param [#to_s, #call] to Name of the queue to re-queue the job to. + # If not specified, will use the job's original queue. + # @return [void] + def requeue_throttled(work, with:, to: nil) # rubocop:disable Metrics/MethodLength + # Resolve :with and :to arguments, calling them if they are Procs + job_args = JSON.parse(work.job)["args"] + requeue_with = with.respond_to?(:call) ? with.call(*job_args) : with + target_queue = calc_target_queue(work, to) + + case requeue_with + when :enqueue + re_enqueue_throttled(work, target_queue) + when :schedule + # Find out when we will next be able to execute this job, and reschedule for then. + reschedule_throttled(work, requeue_to: target_queue) + else + raise "unrecognized :with option #{with}" + end + end + # Marks job as being processed. # @return [void] def finalize!(jid, *job_args) @concurrency&.finalize!(jid, *job_args) end - # Resets count of jobs of all avaliable strategies + # Resets count of jobs of all available strategies # @return [void] def reset! @concurrency&.reset! @threshold&.reset! + end + + private + + def calc_target_queue(work, to) # rubocop:disable Metrics/MethodLength + target = case to + when Proc, Method + to.call(*JSON.parse(work.job)["args"]) + when NilClass + work.queue + when String, Symbol + to.to_s + else + raise ArgumentError, "Invalid argument for `to`" + end + + target = work.queue if target.nil? || target.empty? + + target.start_with?("queue:") ? target : "queue:#{target}" + end + + # Push the job back to the head of the queue. + def re_enqueue_throttled(work, requeue_to) + case work.class.name + when "Sidekiq::Pro::SuperFetch::UnitOfWork" + # Calls SuperFetch UnitOfWork's requeue to remove the job from the + # temporary queue and push job back to the head of the target queue, so that + # the job won't be tried immediately after it was requeued (in most cases). + work.queue = requeue_to if requeue_to + work.requeue + else + # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. + Sidekiq.redis { |conn| conn.lpush(requeue_to, work.job) } + end + end + + def reschedule_throttled(work, requeue_to:) + message = JSON.parse(work.job) + job_class = message.fetch("wrapped") { message.fetch("class") { return false } } + job_args = message["args"] + + # Re-enqueue the job to the target queue at another time as a NEW unit of work + # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned + # Technically, the job could processed twice if the process dies between the two lines, + # but your job should be idempotent anyway, right? + # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk. + Sidekiq::Client.enqueue_to_in(requeue_to, retry_in(work), Object.const_get(job_class), *job_args) + work.acknowledge + end + + def retry_in(work) + message = JSON.parse(work.job) + jid = message.fetch("jid") { return false } + job_args = message["args"] + + # Ask both concurrency and threshold, if relevant, how long minimum until we can retry. + # If we get two answers, take the longer one. + intervals = [@concurrency&.retry_in(jid, *job_args), @threshold&.retry_in(*job_args)].compact + + raise "Cannot compute a valid retry interval" if intervals.empty? + + interval = intervals.max + + # Add a random amount of jitter, proportional to the length of the minimum retry time. + # This helps spread out jobs more evenly and avoid clumps of jobs on the queue. + interval += rand(interval / 5) if interval > 10 + + interval end end end end