lib/sidekiq/api.rb in sidekiq-7.3.4 vs lib/sidekiq/api.rb in sidekiq-7.3.5
- old
+ new
@@ -666,10 +666,45 @@
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job))
end
end
+ def pop_each
+ Sidekiq.redis do |c|
+ size.times do
+ data, score = c.zpopmin(name, 1)&.first
+ break unless data
+ yield data, score
+ end
+ end
+ end
+
+ def retry_all
+ c = Sidekiq::Client.new
+ pop_each do |msg, _|
+ job = Sidekiq.load_json(msg)
+ # Manual retries should not count against the retry limit.
+ job["retry_count"] -= 1 if job["retry_count"]
+ c.push(job)
+ end
+ end
+
+ # Move all jobs from this Set to the Dead Set.
+ # See DeadSet#kill
+ def kill_all(notify_failure: false, ex: nil)
+ ds = DeadSet.new
+ opts = {notify_failure: notify_failure, ex: ex, trim: false}
+
+ begin
+ pop_each do |msg, _|
+ ds.kill(msg, opts)
+ end
+ ensure
+ ds.trim
+ end
+ end
+
def each
initial_size = @_size
offset_size = 0
page = -1
page_size = 50
@@ -763,44 +798,26 @@
alias_method :delete, :delete_by_jid
end
##
# The set of scheduled jobs within Sidekiq.
- # Based on this, you can search/filter for jobs. Here's an
- # example where I'm selecting jobs based on some complex logic
- # and deleting them from the scheduled set.
- #
# See the API wiki page for usage notes and examples.
#
class ScheduledSet < JobSet
def initialize
super("schedule")
end
end
##
# The set of retries within Sidekiq.
- # Based on this, you can search/filter for jobs. Here's an
- # example where I'm selecting all jobs of a certain type
- # and deleting them from the retry queue.
- #
# See the API wiki page for usage notes and examples.
#
class RetrySet < JobSet
def initialize
super("retry")
end
-
- # Enqueues all jobs pending within the retry set.
- def retry_all
- each(&:retry) while size > 0
- end
-
- # Kills all jobs pending within the retry set.
- def kill_all
- each(&:kill) while size > 0
- end
end
##
# The set of dead jobs within Sidekiq. Dead jobs have failed all of
# their retries and are helding in this set pending some sort of manual
@@ -809,24 +826,35 @@
class DeadSet < JobSet
def initialize
super("dead")
end
+ # Trim dead jobs which are over our storage limits
+ def trim
+ hash = Sidekiq.default_configuration
+ now = Time.now.to_f
+ Sidekiq.redis do |conn|
+ conn.multi do |transaction|
+ transaction.zremrangebyscore(name, "-inf", now - hash[:dead_timeout_in_seconds])
+ transaction.zremrangebyrank(name, 0, - hash[:dead_max_jobs])
+ end
+ end
+ end
+
# Add the given job to the Dead set.
# @param message [String] the job data as JSON
- # @option opts [Boolean] :notify_failure (true) Whether death handlers should be called
+ # @option opts [Boolean] :notify_failure (true) Whether death handlers should be called
+ # @option opts [Boolean] :trim (true) Whether Sidekiq should trim the structure to keep it within configuration
# @option opts [Exception] :ex (RuntimeError) An exception to pass to the death handlers
def kill(message, opts = {})
now = Time.now.to_f
Sidekiq.redis do |conn|
- conn.multi do |transaction|
- transaction.zadd(name, now.to_s, message)
- transaction.zremrangebyscore(name, "-inf", now - Sidekiq::Config::DEFAULTS[:dead_timeout_in_seconds])
- transaction.zremrangebyrank(name, 0, - Sidekiq::Config::DEFAULTS[:dead_max_jobs])
- end
+ conn.zadd(name, now.to_s, message)
end
+ trim if opts[:trim] != false
+
if opts[:notify_failure] != false
job = Sidekiq.load_json(message)
if opts[:ex]
ex = opts[:ex]
else
@@ -836,14 +864,9 @@
Sidekiq.default_configuration.death_handlers.each do |handle|
handle.call(job, ex)
end
end
true
- end
-
- # Enqueue all dead jobs
- def retry_all
- each(&:retry) while size > 0
end
end
##
# Enumerates the set of Sidekiq processes which are actively working