lib/sidekiq/api.rb in sidekiq-6.0.0 vs lib/sidekiq/api.rb in sidekiq-6.0.1

- old
+ new

@@ -1,26 +1,14 @@ # frozen_string_literal: true require "sidekiq" -module Sidekiq - module RedisScanner - def sscan(conn, key) - cursor = "0" - result = [] - loop do - cursor, values = conn.sscan(key, cursor) - result.push(*values) - break if cursor == "0" - end - result - end - end +require "zlib" +require "base64" +module Sidekiq class Stats - include RedisScanner - def initialize fetch_stats! end def processed @@ -75,15 +63,15 @@ conn.lrange("queue:default", -1, -1) end } processes = Sidekiq.redis { |conn| - sscan(conn, "processes") + conn.sscan_each("processes").to_a } queues = Sidekiq.redis { |conn| - sscan(conn, "queues") + conn.sscan_each("queues").to_a } pipe2_res = Sidekiq.redis { |conn| conn.pipelined do processes.each { |key| conn.hget(key, "busy") } @@ -140,15 +128,13 @@ def stat(s) @stats[s] end class Queues - include RedisScanner - def lengths Sidekiq.redis do |conn| - queues = sscan(conn, "queues") + queues = conn.sscan_each("queues").to_a lengths = conn.pipelined { queues.each do |queue| conn.llen("queue:#{queue}") end @@ -223,17 +209,16 @@ # job.delete if job.jid == 'abcdef1234567890' # end # class Queue include Enumerable - extend RedisScanner ## # Return all known queues within Redis. # def self.all - Sidekiq.redis { |c| sscan(c, "queues") }.sort.map { |q| Sidekiq::Queue.new(q) } + Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } end attr_reader :name def initialize(name = "default") @@ -347,11 +332,11 @@ safe_load(args[0], klass) do |target, method, _| "#{target}.#{method}" end when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" job_class = @item["wrapped"] || args[0] - if job_class == "ActionMailer::DeliveryJob" + if job_class == "ActionMailer::DeliveryJob" || job_class == "ActionMailer::MailDeliveryJob" # MailerClass#mailer_method args[0]["arguments"][0..1].join("#") else job_class end @@ -370,10 +355,13 @@ when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" job_args = self["wrapped"] ? args[0]["arguments"] : [] if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob" # remove MailerClass, mailer_method and 'deliver_now' job_args.drop(3) + elsif (self["wrapped"] || args[0]) == "ActionMailer::MailDeliveryJob" + # remove MailerClass, mailer_method and 'deliver_now' + job_args.drop(3).first["args"] else job_args end else if self["encrypt"] @@ -398,10 +386,24 @@ def created_at Time.at(self["created_at"] || self["enqueued_at"] || 0).utc end + def tags + self["tags"] || [] + end + + def error_backtrace + # Cache nil values + if defined?(@error_backtrace) + @error_backtrace + else + value = self["error_backtrace"] + @error_backtrace = value && uncompress_backtrace(value) + end + end + attr_reader :queue def latency now = Time.now.to_f now - (@item["enqueued_at"] || @item["created_at"] || now) @@ -431,10 +433,21 @@ # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into # memory yet so the YAML can't be loaded. Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == "development" default end + + def uncompress_backtrace(backtrace) + if backtrace.is_a?(Array) + # Handle old jobs with previous backtrace format + backtrace + else + decoded = Base64.decode64(backtrace) + uncompressed = Zlib::Inflate.inflate(decoded) + Marshal.load(uncompressed) + end + end end class SortedEntry < Job attr_reader :score attr_reader :parent @@ -538,10 +551,21 @@ def size Sidekiq.redis { |c| c.zcard(name) } end + def scan(match, count = 100) + return to_enum(:scan, match) unless block_given? + + match = "*#{match}*" unless match.include?("*") + Sidekiq.redis do |conn| + conn.zscan_each(name, match: match, count: count) do |entry, score| + yield SortedEntry.new(self, score, entry) + end + end + end + def clear Sidekiq.redis do |conn| conn.del(name) end end @@ -574,32 +598,44 @@ end offset_size = initial_size - @_size end end + ## + # Fetch jobs that match a given time or Range. Job ID is an + # optional second argument. def fetch(score, jid = nil) + begin_score, end_score = + if score.is_a?(Range) + [score.first, score.last] + else + [score, score] + end + elements = Sidekiq.redis { |conn| - conn.zrangebyscore(name, score, score) + conn.zrangebyscore(name, begin_score, end_score, with_scores: true) } elements.each_with_object([]) do |element, result| - entry = SortedEntry.new(self, score, element) - if jid - result << entry if entry.jid == jid - else - result << entry - end + data, job_score = element + entry = SortedEntry.new(self, job_score, data) + result << entry if jid.nil? || entry.jid == jid end end ## # Find the job with the given JID within this sorted set. - # - # This is a slow, inefficient operation. Do not use under - # normal conditions. Sidekiq Pro contains a faster version. + # This is a slower O(n) operation. Do not use for app logic. def find_job(jid) - detect { |j| j.jid == jid } + Sidekiq.redis do |conn| + conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score| + job = JSON.parse(entry) + matched = job["jid"] == jid + return SortedEntry.new(self, score, entry) if matched + end + end + nil end def delete_by_value(name, value) Sidekiq.redis do |conn| ret = conn.zrem(name, value) @@ -718,22 +754,21 @@ # # Yields a Sidekiq::Process. # class ProcessSet include Enumerable - include RedisScanner def initialize(clean_plz = true) cleanup if clean_plz end # Cleans up dead processes recorded in Redis. # Returns the number of processes cleaned. def cleanup count = 0 Sidekiq.redis do |conn| - procs = sscan(conn, "processes").sort + procs = conn.sscan_each("processes").to_a.sort heartbeats = conn.pipelined { procs.each do |key| conn.hget(key, "info") end } @@ -749,11 +784,11 @@ end count end def each - procs = Sidekiq.redis { |conn| sscan(conn, "processes") }.sort + procs = Sidekiq.redis { |conn| conn.sscan_each("processes").to_a }.sort Sidekiq.redis do |conn| # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way @@ -883,15 +918,14 @@ # # run_at is an epoch Integer. # end # class Workers include Enumerable - include RedisScanner def each Sidekiq.redis do |conn| - procs = sscan(conn, "processes") + procs = conn.sscan_each("processes").to_a procs.sort.each do |key| valid, workers = conn.pipelined { conn.exists(key) conn.hgetall("#{key}:workers") } @@ -909,10 +943,10 @@ # Not very efficient if you have lots of Sidekiq # processes but the alternative is a global counter # which can easily get out of sync with crashy processes. def size Sidekiq.redis do |conn| - procs = sscan(conn, "processes") + procs = conn.sscan_each("processes").to_a if procs.empty? 0 else conn.pipelined { procs.each do |key|