lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.1 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.2

- old
+ new

@@ -141,45 +141,52 @@ when Proc, Method requeue_to.call(*JSON.parse(work.job)["args"]) when NilClass work.queue when String, Symbol - requeue_to.to_s + requeue_to else raise ArgumentError, "Invalid argument for `to`" end target = work.queue if target.nil? || target.empty? - target.start_with?("queue:") ? target : "queue:#{target}" + target.to_s end # Push the job back to the head of the queue. + # The queue name is expected to include the "queue:" prefix, so we add it if it's missing. def re_enqueue_throttled(work, target_queue) + target_queue = "queue:#{target_queue}" unless target_queue.start_with?("queue:") + case work.class.name when "Sidekiq::Pro::SuperFetch::UnitOfWork" # Calls SuperFetch UnitOfWork's requeue to remove the job from the # temporary queue and push job back to the head of the target queue, so that # the job won't be tried immediately after it was requeued (in most cases). - work.queue = target_queue if target_queue + work.queue = target_queue work.requeue else # This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call. Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) } end end + # Reschedule the job to be executed later in the target queue. + # The queue name should NOT include the "queue:" prefix, so we remove it if it's present. def reschedule_throttled(work, target_queue) - message = JSON.parse(work.job) - job_class = message.fetch("wrapped") { message.fetch("class") { return false } } - job_args = message["args"] + target_queue = target_queue.delete_prefix("queue:") + message = JSON.parse(work.job) + job_class = message.fetch("wrapped") { message.fetch("class") { return false } } + job_args = message["args"] # Re-enqueue the job to the target queue at another time as a NEW unit of work # AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned # Technically, the job could processed twice if the process dies between the two lines, # but your job should be idempotent anyway, right? # The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk. Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args) + work.acknowledge end def retry_in(work) message = JSON.parse(work.job)