lib/sidekiq/api.rb in sidekiq-7.3.4 vs lib/sidekiq/api.rb in sidekiq-7.3.5

- old
+ new

@@ -666,10 +666,45 @@ Sidekiq.redis do |conn| conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(job)) end end + def pop_each + Sidekiq.redis do |c| + size.times do + data, score = c.zpopmin(name, 1)&.first + break unless data + yield data, score + end + end + end + + def retry_all + c = Sidekiq::Client.new + pop_each do |msg, _| + job = Sidekiq.load_json(msg) + # Manual retries should not count against the retry limit. + job["retry_count"] -= 1 if job["retry_count"] + c.push(job) + end + end + + # Move all jobs from this Set to the Dead Set. + # See DeadSet#kill + def kill_all(notify_failure: false, ex: nil) + ds = DeadSet.new + opts = {notify_failure: notify_failure, ex: ex, trim: false} + + begin + pop_each do |msg, _| + ds.kill(msg, opts) + end + ensure + ds.trim + end + end + def each initial_size = @_size offset_size = 0 page = -1 page_size = 50 @@ -763,44 +798,26 @@ alias_method :delete, :delete_by_jid end ## # The set of scheduled jobs within Sidekiq. - # Based on this, you can search/filter for jobs. Here's an - # example where I'm selecting jobs based on some complex logic - # and deleting them from the scheduled set. - # # See the API wiki page for usage notes and examples. # class ScheduledSet < JobSet def initialize super("schedule") end end ## # The set of retries within Sidekiq. - # Based on this, you can search/filter for jobs. Here's an - # example where I'm selecting all jobs of a certain type - # and deleting them from the retry queue. - # # See the API wiki page for usage notes and examples. # class RetrySet < JobSet def initialize super("retry") end - - # Enqueues all jobs pending within the retry set. - def retry_all - each(&:retry) while size > 0 - end - - # Kills all jobs pending within the retry set. - def kill_all - each(&:kill) while size > 0 - end end ## # The set of dead jobs within Sidekiq. Dead jobs have failed all of # their retries and are helding in this set pending some sort of manual @@ -809,24 +826,35 @@ class DeadSet < JobSet def initialize super("dead") end + # Trim dead jobs which are over our storage limits + def trim + hash = Sidekiq.default_configuration + now = Time.now.to_f + Sidekiq.redis do |conn| + conn.multi do |transaction| + transaction.zremrangebyscore(name, "-inf", now - hash[:dead_timeout_in_seconds]) + transaction.zremrangebyrank(name, 0, - hash[:dead_max_jobs]) + end + end + end + # Add the given job to the Dead set. # @param message [String] the job data as JSON - # @option opts [Boolean] :notify_failure (true) Whether death handlers should be called + # @option opts [Boolean] :notify_failure (true) Whether death handlers should be called + # @option opts [Boolean] :trim (true) Whether Sidekiq should trim the structure to keep it within configuration # @option opts [Exception] :ex (RuntimeError) An exception to pass to the death handlers def kill(message, opts = {}) now = Time.now.to_f Sidekiq.redis do |conn| - conn.multi do |transaction| - transaction.zadd(name, now.to_s, message) - transaction.zremrangebyscore(name, "-inf", now - Sidekiq::Config::DEFAULTS[:dead_timeout_in_seconds]) - transaction.zremrangebyrank(name, 0, - Sidekiq::Config::DEFAULTS[:dead_max_jobs]) - end + conn.zadd(name, now.to_s, message) end + trim if opts[:trim] != false + if opts[:notify_failure] != false job = Sidekiq.load_json(message) if opts[:ex] ex = opts[:ex] else @@ -836,14 +864,9 @@ Sidekiq.default_configuration.death_handlers.each do |handle| handle.call(job, ex) end end true - end - - # Enqueue all dead jobs - def retry_all - each(&:retry) while size > 0 end end ## # Enumerates the set of Sidekiq processes which are actively working