lib/sidekiq/throttled/unit_of_work.rb in sidekiq-throttled-0.5.0 vs lib/sidekiq/throttled/unit_of_work.rb in sidekiq-throttled-0.6.0
- old
+ new
@@ -1,33 +1,56 @@
# frozen_string_literal: true
require "sidekiq"
+require "sidekiq/throttled/queue_name"
+
module Sidekiq
module Throttled
+ # BRPOP response envelope.
+ #
+ # @see Throttled::Fetch
+ # @private
class UnitOfWork
- QUEUE_NAME_PREFIX_RE = /^.*queue:/
- private_constant :QUEUE_NAME_PREFIX_RE
-
+ # @return [String] Redis key where job was pulled from
attr_reader :queue
+ # @return [String] Job's JSON payload
attr_reader :job
+ # @param [String] queue Redis key where job was pulled from
+ # @param [String] job Job's JSON payload
def initialize(queue, job)
@queue = queue
@job = job
end
+ # Callback that is called by `Sidekiq::Processor` when job was
+ # succeccfully processed. Most this is used by `ReliableFetch`
+ # of Sidekiq Pro/Enterprise to remove job from running queue.
+ #
+ # @return [void]
def acknowledge
# do nothing
end
+ # Normalized `queue` name.
+ #
+ # @see QueueName.normalize
+ # @return [String]
def queue_name
- queue.sub(QUEUE_NAME_PREFIX_RE, "")
+ @queue_name ||= QueueName.normalize queue
end
+ # Pushes job back to the queue.
+ #
+ # @note This is triggered when job was not finished and Sidekiq server
+ # process was terminated (shutdowned). Thus it should be reverse of
+ # whatever fetcher was doing to pull the job out of queue.
+ #
+ # @return [void]
def requeue
- Sidekiq.redis { |conn| conn.rpush("queue:#{queue_name}", job) }
+ Sidekiq.redis { |conn| conn.rpush(QueueName.expand(queue_name), job) }
end
end
end
end