lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.1 vs lib/sidekiq/throttled/strategy.rb in sidekiq-throttled-1.5.2
- old
+ new
@@ -141,45 +141,52 @@
when Proc, Method
requeue_to.call(*JSON.parse(work.job)["args"])
when NilClass
work.queue
when String, Symbol
- requeue_to.to_s
+ requeue_to
else
raise ArgumentError, "Invalid argument for `to`"
end
target = work.queue if target.nil? || target.empty?
- target.start_with?("queue:") ? target : "queue:#{target}"
+ target.to_s
end
# Push the job back to the head of the queue.
+ # The queue name is expected to include the "queue:" prefix, so we add it if it's missing.
def re_enqueue_throttled(work, target_queue)
+ target_queue = "queue:#{target_queue}" unless target_queue.start_with?("queue:")
+
case work.class.name
when "Sidekiq::Pro::SuperFetch::UnitOfWork"
# Calls SuperFetch UnitOfWork's requeue to remove the job from the
# temporary queue and push job back to the head of the target queue, so that
# the job won't be tried immediately after it was requeued (in most cases).
- work.queue = target_queue if target_queue
+ work.queue = target_queue
work.requeue
else
# This is the same operation Sidekiq performs upon `Sidekiq::Worker.perform_async` call.
Sidekiq.redis { |conn| conn.lpush(target_queue, work.job) }
end
end
+ # Reschedule the job to be executed later in the target queue.
+ # The queue name should NOT include the "queue:" prefix, so we remove it if it's present.
def reschedule_throttled(work, target_queue)
- message = JSON.parse(work.job)
- job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
- job_args = message["args"]
+ target_queue = target_queue.delete_prefix("queue:")
+ message = JSON.parse(work.job)
+ job_class = message.fetch("wrapped") { message.fetch("class") { return false } }
+ job_args = message["args"]
# Re-enqueue the job to the target queue at another time as a NEW unit of work
# AND THEN mark this work as done, so SuperFetch doesn't think this instance is orphaned
# Technically, the job could processed twice if the process dies between the two lines,
# but your job should be idempotent anyway, right?
# The job running twice was already a risk with SuperFetch anyway and this doesn't really increase that risk.
Sidekiq::Client.enqueue_to_in(target_queue, retry_in(work), Object.const_get(job_class), *job_args)
+
work.acknowledge
end
def retry_in(work)
message = JSON.parse(work.job)