lib/sidekiq/api.rb in sidekiq-6.0.0 vs lib/sidekiq/api.rb in sidekiq-6.0.1
- old
+ new
@@ -1,26 +1,14 @@
# frozen_string_literal: true
require "sidekiq"
-module Sidekiq
- module RedisScanner
- def sscan(conn, key)
- cursor = "0"
- result = []
- loop do
- cursor, values = conn.sscan(key, cursor)
- result.push(*values)
- break if cursor == "0"
- end
- result
- end
- end
+require "zlib"
+require "base64"
+module Sidekiq
class Stats
- include RedisScanner
-
def initialize
fetch_stats!
end
def processed
@@ -75,15 +63,15 @@
conn.lrange("queue:default", -1, -1)
end
}
processes = Sidekiq.redis { |conn|
- sscan(conn, "processes")
+ conn.sscan_each("processes").to_a
}
queues = Sidekiq.redis { |conn|
- sscan(conn, "queues")
+ conn.sscan_each("queues").to_a
}
pipe2_res = Sidekiq.redis { |conn|
conn.pipelined do
processes.each { |key| conn.hget(key, "busy") }
@@ -140,15 +128,13 @@
def stat(s)
@stats[s]
end
class Queues
- include RedisScanner
-
def lengths
Sidekiq.redis do |conn|
- queues = sscan(conn, "queues")
+ queues = conn.sscan_each("queues").to_a
lengths = conn.pipelined {
queues.each do |queue|
conn.llen("queue:#{queue}")
end
@@ -223,17 +209,16 @@
# job.delete if job.jid == 'abcdef1234567890'
# end
#
class Queue
include Enumerable
- extend RedisScanner
##
# Return all known queues within Redis.
#
def self.all
- Sidekiq.redis { |c| sscan(c, "queues") }.sort.map { |q| Sidekiq::Queue.new(q) }
+ Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
def initialize(name = "default")
@@ -347,11 +332,11 @@
safe_load(args[0], klass) do |target, method, _|
"#{target}.#{method}"
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_class = @item["wrapped"] || args[0]
- if job_class == "ActionMailer::DeliveryJob"
+ if job_class == "ActionMailer::DeliveryJob" || job_class == "ActionMailer::MailDeliveryJob"
# MailerClass#mailer_method
args[0]["arguments"][0..1].join("#")
else
job_class
end
@@ -370,10 +355,13 @@
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_args = self["wrapped"] ? args[0]["arguments"] : []
if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob"
# remove MailerClass, mailer_method and 'deliver_now'
job_args.drop(3)
+ elsif (self["wrapped"] || args[0]) == "ActionMailer::MailDeliveryJob"
+ # remove MailerClass, mailer_method and 'deliver_now'
+ job_args.drop(3).first["args"]
else
job_args
end
else
if self["encrypt"]
@@ -398,10 +386,24 @@
def created_at
Time.at(self["created_at"] || self["enqueued_at"] || 0).utc
end
+ def tags
+ self["tags"] || []
+ end
+
+ def error_backtrace
+ # Cache nil values
+ if defined?(@error_backtrace)
+ @error_backtrace
+ else
+ value = self["error_backtrace"]
+ @error_backtrace = value && uncompress_backtrace(value)
+ end
+ end
+
attr_reader :queue
def latency
now = Time.now.to_f
now - (@item["enqueued_at"] || @item["created_at"] || now)
@@ -431,10 +433,21 @@
# #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
# memory yet so the YAML can't be loaded.
Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == "development"
default
end
+
+ def uncompress_backtrace(backtrace)
+ if backtrace.is_a?(Array)
+ # Handle old jobs with previous backtrace format
+ backtrace
+ else
+ decoded = Base64.decode64(backtrace)
+ uncompressed = Zlib::Inflate.inflate(decoded)
+ Marshal.load(uncompressed)
+ end
+ end
end
class SortedEntry < Job
attr_reader :score
attr_reader :parent
@@ -538,10 +551,21 @@
def size
Sidekiq.redis { |c| c.zcard(name) }
end
+ def scan(match, count = 100)
+ return to_enum(:scan, match) unless block_given?
+
+ match = "*#{match}*" unless match.include?("*")
+ Sidekiq.redis do |conn|
+ conn.zscan_each(name, match: match, count: count) do |entry, score|
+ yield SortedEntry.new(self, score, entry)
+ end
+ end
+ end
+
def clear
Sidekiq.redis do |conn|
conn.del(name)
end
end
@@ -574,32 +598,44 @@
end
offset_size = initial_size - @_size
end
end
+ ##
+ # Fetch jobs that match a given time or Range. Job ID is an
+ # optional second argument.
def fetch(score, jid = nil)
+ begin_score, end_score =
+ if score.is_a?(Range)
+ [score.first, score.last]
+ else
+ [score, score]
+ end
+
elements = Sidekiq.redis { |conn|
- conn.zrangebyscore(name, score, score)
+ conn.zrangebyscore(name, begin_score, end_score, with_scores: true)
}
elements.each_with_object([]) do |element, result|
- entry = SortedEntry.new(self, score, element)
- if jid
- result << entry if entry.jid == jid
- else
- result << entry
- end
+ data, job_score = element
+ entry = SortedEntry.new(self, job_score, data)
+ result << entry if jid.nil? || entry.jid == jid
end
end
##
# Find the job with the given JID within this sorted set.
- #
- # This is a slow, inefficient operation. Do not use under
- # normal conditions. Sidekiq Pro contains a faster version.
+ # This is a slower O(n) operation. Do not use for app logic.
def find_job(jid)
- detect { |j| j.jid == jid }
+ Sidekiq.redis do |conn|
+ conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score|
+ job = JSON.parse(entry)
+ matched = job["jid"] == jid
+ return SortedEntry.new(self, score, entry) if matched
+ end
+ end
+ nil
end
def delete_by_value(name, value)
Sidekiq.redis do |conn|
ret = conn.zrem(name, value)
@@ -718,22 +754,21 @@
#
# Yields a Sidekiq::Process.
#
class ProcessSet
include Enumerable
- include RedisScanner
def initialize(clean_plz = true)
cleanup if clean_plz
end
# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
def cleanup
count = 0
Sidekiq.redis do |conn|
- procs = sscan(conn, "processes").sort
+ procs = conn.sscan_each("processes").to_a.sort
heartbeats = conn.pipelined {
procs.each do |key|
conn.hget(key, "info")
end
}
@@ -749,11 +784,11 @@
end
count
end
def each
- procs = Sidekiq.redis { |conn| sscan(conn, "processes") }.sort
+ procs = Sidekiq.redis { |conn| conn.sscan_each("processes").to_a }.sort
Sidekiq.redis do |conn|
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
@@ -883,15 +918,14 @@
# # run_at is an epoch Integer.
# end
#
class Workers
include Enumerable
- include RedisScanner
def each
Sidekiq.redis do |conn|
- procs = sscan(conn, "processes")
+ procs = conn.sscan_each("processes").to_a
procs.sort.each do |key|
valid, workers = conn.pipelined {
conn.exists(key)
conn.hgetall("#{key}:workers")
}
@@ -909,10 +943,10 @@
# Not very efficient if you have lots of Sidekiq
# processes but the alternative is a global counter
# which can easily get out of sync with crashy processes.
def size
Sidekiq.redis do |conn|
- procs = sscan(conn, "processes")
+ procs = conn.sscan_each("processes").to_a
if procs.empty?
0
else
conn.pipelined {
procs.each do |key|