lib/sidekiq/api.rb in sidekiq-6.5.12 vs lib/sidekiq/api.rb in sidekiq-7.0.0.beta1
- old
+ new
@@ -4,14 +4,11 @@
require "zlib"
require "set"
require "base64"
-if ENV["SIDEKIQ_METRICS_BETA"]
- require "sidekiq/metrics/deploy"
- require "sidekiq/metrics/query"
-end
+require "sidekiq/metrics/query"
#
# Sidekiq's Data API provides a Ruby object model on top
# of Sidekiq's runtime data in Redis. This API should never
# be used within application code for business logic.
@@ -68,11 +65,22 @@
def default_queue_latency
stat :default_queue_latency
end
def queues
- Sidekiq::Stats::Queues.new.lengths
+ Sidekiq.redis do |conn|
+ queues = conn.sscan("queues").to_a
+
+ lengths = conn.pipelined { |pipeline|
+ queues.each do |queue|
+ pipeline.llen("queue:#{queue}")
+ end
+ }
+
+ array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
+ array_of_arrays.to_h
+ end
end
# O(1) redis calls
# @api private
def fetch_stats_fast!
@@ -115,15 +123,15 @@
# O(number of processes + number of queues) redis calls
# @api private
def fetch_stats_slow!
processes = Sidekiq.redis { |conn|
- conn.sscan_each("processes").to_a
+ conn.sscan("processes").to_a
}
queues = Sidekiq.redis { |conn|
- conn.sscan_each("queues").to_a
+ conn.sscan("queues").to_a
}
pipe2_res = Sidekiq.redis { |conn|
conn.pipelined do |pipeline|
processes.each { |key| pipeline.hget(key, "busy") }
@@ -131,11 +139,11 @@
end
}
s = processes.size
workers_size = pipe2_res[0...s].sum(&:to_i)
- enqueued = pipe2_res[s..-1].sum(&:to_i)
+ enqueued = pipe2_res[s..].sum(&:to_i)
@stats[:workers_size] = workers_size
@stats[:enqueued] = enqueued
@stats
end
@@ -166,29 +174,12 @@
def stat(s)
fetch_stats_slow! if @stats[s].nil?
@stats[s] || raise(ArgumentError, "Unknown stat #{s}")
end
- class Queues
- def lengths
- Sidekiq.redis do |conn|
- queues = conn.sscan_each("queues").to_a
-
- lengths = conn.pipelined { |pipeline|
- queues.each do |queue|
- pipeline.llen("queue:#{queue}")
- end
- }
-
- array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
- array_of_arrays.to_h
- end
- end
- end
-
class History
- def initialize(days_previous, start_date = nil)
+ def initialize(days_previous, start_date = nil, pool: nil)
# we only store five years of data in Redis
raise ArgumentError if days_previous < 1 || days_previous > (5 * 365)
@days_previous = days_previous
@start_date = start_date || Time.now.utc.to_date
end
@@ -209,19 +200,14 @@
date.strftime("%Y-%m-%d")
}
keys = dates.map { |datestr| "stat:#{stat}:#{datestr}" }
- begin
- Sidekiq.redis do |conn|
- conn.mget(keys).each_with_index do |value, idx|
- stat_hash[dates[idx]] = value ? value.to_i : 0
- end
+ Sidekiq.redis do |conn|
+ conn.mget(keys).each_with_index do |value, idx|
+ stat_hash[dates[idx]] = value ? value.to_i : 0
end
- rescue RedisConnection.adapter::CommandError
- # mget will trigger a CROSSSLOT error when run against a Cluster
- # TODO Someone want to add Cluster support?
end
stat_hash
end
end
@@ -245,11 +231,11 @@
##
# Fetch all known queues within Redis.
#
# @return [Array<Sidekiq::Queue>]
def self.all
- Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
+ Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
# @param name [String] the name of the queue
@@ -386,16 +372,11 @@
end
def display_class
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@klass ||= self["display_class"] || begin
- case klass
- when /\ASidekiq::Extensions::Delayed/
- safe_load(args[0], klass) do |target, method, _|
- "#{target}.#{method}"
- end
- when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
+ if klass == "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_class = @item["wrapped"] || args[0]
if job_class == "ActionMailer::DeliveryJob" || job_class == "ActionMailer::MailDeliveryJob"
# MailerClass#mailer_method
args[0]["arguments"][0..1].join("#")
else
@@ -407,20 +388,11 @@
end
end
def display_args
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
- @display_args ||= case klass
- when /\ASidekiq::Extensions::Delayed/
- safe_load(args[0], args) do |_, _, arg, kwarg|
- if !kwarg || kwarg.empty?
- arg
- else
- [arg, kwarg]
- end
- end
- when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
+ @display_args ||= if klass == "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
job_args = self["wrapped"] ? args[0]["arguments"] : []
if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob"
# remove MailerClass, mailer_method and 'deliver_now'
job_args.drop(3)
elsif (self["wrapped"] || args[0]) == "ActionMailer::MailDeliveryJob"
@@ -489,35 +461,14 @@
@item ? @item[name] : nil
end
private
- def safe_load(content, default)
- yield(*YAML.load(content))
- rescue => ex
- # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
- # memory yet so the YAML can't be loaded.
- # TODO is this still necessary? Zeitwerk reloader should handle?
- Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == "development"
- default
- end
-
def uncompress_backtrace(backtrace)
- if backtrace.is_a?(Array)
- # Handle old jobs with raw Array backtrace format
- backtrace
- else
- decoded = Base64.decode64(backtrace)
- uncompressed = Zlib::Inflate.inflate(decoded)
- begin
- Sidekiq.load_json(uncompressed)
- rescue
- # Handle old jobs with marshalled backtrace format
- # TODO Remove in 7.x
- Marshal.load(uncompressed)
- end
- end
+ decoded = Base64.decode64(backtrace)
+ uncompressed = Zlib::Inflate.inflate(decoded)
+ Sidekiq.load_json(uncompressed)
end
end
# Represents a job within a Redis sorted set where the score
# represents a timestamp associated with the job. This timestamp
@@ -654,11 +605,11 @@
def scan(match, count = 100)
return to_enum(:scan, match, count) unless block_given?
match = "*#{match}*" unless match.include?("*")
Sidekiq.redis do |conn|
- conn.zscan_each(name, match: match, count: count) do |entry, score|
+ conn.zscan(name, match: match, count: count) do |entry, score|
yield SortedEntry.new(self, score, entry)
end
end
end
@@ -744,11 +695,11 @@
#
# @param jid [String] the job identifier
# @return [SortedEntry] the record or nil
def find_job(jid)
Sidekiq.redis do |conn|
- conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score|
+ conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score|
job = JSON.parse(entry)
matched = job["jid"] == jid
return SortedEntry.new(self, score, entry) if matched
end
end
@@ -790,16 +741,12 @@
# The set of scheduled jobs within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting jobs based on some complex logic
# and deleting them from the scheduled set.
#
- # r = Sidekiq::ScheduledSet.new
- # r.select do |scheduled|
- # scheduled.klass == 'Sidekiq::Extensions::DelayedClass' &&
- # scheduled.args[0] == 'User' &&
- # scheduled.args[1] == 'setup_new_subscriber'
- # end.map(&:delete)
+ # See the API wiki page for usage notes and examples.
+ #
class ScheduledSet < JobSet
def initialize
super "schedule"
end
end
@@ -808,16 +755,12 @@
# The set of retries within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.
#
- # r = Sidekiq::RetrySet.new
- # r.select do |retri|
- # retri.klass == 'Sidekiq::Extensions::DelayedClass' &&
- # retri.args[0] == 'User' &&
- # retri.args[1] == 'setup_new_subscriber'
- # end.map(&:delete)
+ # See the API wiki page for usage notes and examples.
+ #
class RetrySet < JobSet
def initialize
super "retry"
end
@@ -847,42 +790,30 @@
def kill(message, opts = {})
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do |transaction|
transaction.zadd(name, now.to_s, message)
- transaction.zremrangebyscore(name, "-inf", now - self.class.timeout)
- transaction.zremrangebyrank(name, 0, - self.class.max_jobs)
+ transaction.zremrangebyscore(name, "-inf", now - Sidekiq::Config::DEFAULTS[:dead_timeout_in_seconds])
+ transaction.zremrangebyrank(name, 0, - Sidekiq::Config::DEFAULTS[:dead_max_jobs])
end
end
if opts[:notify_failure] != false
job = Sidekiq.load_json(message)
r = RuntimeError.new("Job killed by API")
r.set_backtrace(caller)
- Sidekiq.death_handlers.each do |handle|
+ Sidekiq.default_configuration.death_handlers.each do |handle|
handle.call(job, r)
end
end
true
end
# Enqueue all dead jobs
def retry_all
each(&:retry) while size > 0
end
-
- # The maximum size of the Dead set. Older entries will be trimmed
- # to stay within this limit. Default value is 10,000.
- def self.max_jobs
- Sidekiq[:dead_max_jobs]
- end
-
- # The time limit for entries within the Dead set. Older entries will be thrown away.
- # Default value is six months.
- def self.timeout
- Sidekiq[:dead_timeout_in_seconds]
- end
end
##
# Enumerates the set of Sidekiq processes which are actively working
# right now. Each process sends a heartbeat to Redis every 5 seconds
@@ -907,11 +838,11 @@
# dont run cleanup more than once per minute
return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
count = 0
Sidekiq.redis do |conn|
- procs = conn.sscan_each("processes").to_a
+ procs = conn.sscan("processes").to_a
heartbeats = conn.pipelined { |pipeline|
procs.each do |key|
pipeline.hget(key, "info")
end
}
@@ -927,11 +858,11 @@
count
end
def each
result = Sidekiq.redis { |conn|
- procs = conn.sscan_each("processes").to_a.sort
+ procs = conn.sscan("processes").to_a.sort
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
conn.pipelined do |pipeline|
@@ -1019,11 +950,11 @@
def tag
self["tag"]
end
def labels
- Array(self["labels"])
+ self["labels"].to_a
end
def [](key)
@attribs[key]
end
@@ -1101,35 +1032,28 @@
class WorkSet
include Enumerable
def each(&block)
results = []
- procs = nil
- all_works = nil
-
Sidekiq.redis do |conn|
- procs = conn.sscan_each("processes").to_a.sort
-
- all_works = conn.pipelined do |pipeline|
- procs.each do |key|
+ procs = conn.sscan("processes").to_a
+ procs.sort.each do |key|
+ valid, workers = conn.pipelined { |pipeline|
+ pipeline.exists(key)
pipeline.hgetall("#{key}:work")
+ }
+ next unless valid > 0
+ workers.each_pair do |tid, json|
+ hsh = Sidekiq.load_json(json)
+ p = hsh["payload"]
+ # avoid breaking API, this is a side effect of the JSON optimization in #4316
+ hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
+ results << [key, tid, hsh]
end
end
end
- procs.zip(all_works).each do |key, workers|
- workers.each_pair do |tid, json|
- next if json.empty?
-
- hsh = Sidekiq.load_json(json)
- p = hsh["payload"]
- # avoid breaking API, this is a side effect of the JSON optimization in #4316
- hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
- results << [key, tid, hsh]
- end
- end
-
results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
end
# Note that #size is only as accurate as Sidekiq's heartbeat,
# which happens every 5 seconds. It is NOT real-time.
@@ -1137,10 +1061,10 @@
# Not very efficient if you have lots of Sidekiq
# processes but the alternative is a global counter
# which can easily get out of sync with crashy processes.
def size
Sidekiq.redis do |conn|
- procs = conn.sscan_each("processes").to_a
+ procs = conn.sscan("processes").to_a
if procs.empty?
0
else
conn.pipelined { |pipeline|
procs.each do |key|