lib/sidekiq/api.rb in sidekiq-6.5.12 vs lib/sidekiq/api.rb in sidekiq-7.0.0.beta1

- old
+ new

@@ -4,14 +4,11 @@ require "zlib" require "set" require "base64" -if ENV["SIDEKIQ_METRICS_BETA"] - require "sidekiq/metrics/deploy" - require "sidekiq/metrics/query" -end +require "sidekiq/metrics/query" # # Sidekiq's Data API provides a Ruby object model on top # of Sidekiq's runtime data in Redis. This API should never # be used within application code for business logic. @@ -68,11 +65,22 @@ def default_queue_latency stat :default_queue_latency end def queues - Sidekiq::Stats::Queues.new.lengths + Sidekiq.redis do |conn| + queues = conn.sscan("queues").to_a + + lengths = conn.pipelined { |pipeline| + queues.each do |queue| + pipeline.llen("queue:#{queue}") + end + } + + array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size } + array_of_arrays.to_h + end end # O(1) redis calls # @api private def fetch_stats_fast! @@ -115,15 +123,15 @@ # O(number of processes + number of queues) redis calls # @api private def fetch_stats_slow! processes = Sidekiq.redis { |conn| - conn.sscan_each("processes").to_a + conn.sscan("processes").to_a } queues = Sidekiq.redis { |conn| - conn.sscan_each("queues").to_a + conn.sscan("queues").to_a } pipe2_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| processes.each { |key| pipeline.hget(key, "busy") } @@ -131,11 +139,11 @@ end } s = processes.size workers_size = pipe2_res[0...s].sum(&:to_i) - enqueued = pipe2_res[s..-1].sum(&:to_i) + enqueued = pipe2_res[s..].sum(&:to_i) @stats[:workers_size] = workers_size @stats[:enqueued] = enqueued @stats end @@ -166,29 +174,12 @@ def stat(s) fetch_stats_slow! if @stats[s].nil? @stats[s] || raise(ArgumentError, "Unknown stat #{s}") end - class Queues - def lengths - Sidekiq.redis do |conn| - queues = conn.sscan_each("queues").to_a - - lengths = conn.pipelined { |pipeline| - queues.each do |queue| - pipeline.llen("queue:#{queue}") - end - } - - array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size } - array_of_arrays.to_h - end - end - end - class History - def initialize(days_previous, start_date = nil) + def initialize(days_previous, start_date = nil, pool: nil) # we only store five years of data in Redis raise ArgumentError if days_previous < 1 || days_previous > (5 * 365) @days_previous = days_previous @start_date = start_date || Time.now.utc.to_date end @@ -209,19 +200,14 @@ date.strftime("%Y-%m-%d") } keys = dates.map { |datestr| "stat:#{stat}:#{datestr}" } - begin - Sidekiq.redis do |conn| - conn.mget(keys).each_with_index do |value, idx| - stat_hash[dates[idx]] = value ? value.to_i : 0 - end + Sidekiq.redis do |conn| + conn.mget(keys).each_with_index do |value, idx| + stat_hash[dates[idx]] = value ? value.to_i : 0 end - rescue RedisConnection.adapter::CommandError - # mget will trigger a CROSSSLOT error when run against a Cluster - # TODO Someone want to add Cluster support? end stat_hash end end @@ -245,11 +231,11 @@ ## # Fetch all known queues within Redis. # # @return [Array<Sidekiq::Queue>] def self.all - Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } + Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } end attr_reader :name # @param name [String] the name of the queue @@ -386,16 +372,11 @@ end def display_class # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI @klass ||= self["display_class"] || begin - case klass - when /\ASidekiq::Extensions::Delayed/ - safe_load(args[0], klass) do |target, method, _| - "#{target}.#{method}" - end - when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" + if klass == "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" job_class = @item["wrapped"] || args[0] if job_class == "ActionMailer::DeliveryJob" || job_class == "ActionMailer::MailDeliveryJob" # MailerClass#mailer_method args[0]["arguments"][0..1].join("#") else @@ -407,20 +388,11 @@ end end def display_args # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI - @display_args ||= case klass - when /\ASidekiq::Extensions::Delayed/ - safe_load(args[0], args) do |_, _, arg, kwarg| - if !kwarg || kwarg.empty? - arg - else - [arg, kwarg] - end - end - when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" + @display_args ||= if klass == "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" job_args = self["wrapped"] ? args[0]["arguments"] : [] if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob" # remove MailerClass, mailer_method and 'deliver_now' job_args.drop(3) elsif (self["wrapped"] || args[0]) == "ActionMailer::MailDeliveryJob" @@ -489,35 +461,14 @@ @item ? @item[name] : nil end private - def safe_load(content, default) - yield(*YAML.load(content)) - rescue => ex - # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into - # memory yet so the YAML can't be loaded. - # TODO is this still necessary? Zeitwerk reloader should handle? - Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == "development" - default - end - def uncompress_backtrace(backtrace) - if backtrace.is_a?(Array) - # Handle old jobs with raw Array backtrace format - backtrace - else - decoded = Base64.decode64(backtrace) - uncompressed = Zlib::Inflate.inflate(decoded) - begin - Sidekiq.load_json(uncompressed) - rescue - # Handle old jobs with marshalled backtrace format - # TODO Remove in 7.x - Marshal.load(uncompressed) - end - end + decoded = Base64.decode64(backtrace) + uncompressed = Zlib::Inflate.inflate(decoded) + Sidekiq.load_json(uncompressed) end end # Represents a job within a Redis sorted set where the score # represents a timestamp associated with the job. This timestamp @@ -654,11 +605,11 @@ def scan(match, count = 100) return to_enum(:scan, match, count) unless block_given? match = "*#{match}*" unless match.include?("*") Sidekiq.redis do |conn| - conn.zscan_each(name, match: match, count: count) do |entry, score| + conn.zscan(name, match: match, count: count) do |entry, score| yield SortedEntry.new(self, score, entry) end end end @@ -744,11 +695,11 @@ # # @param jid [String] the job identifier # @return [SortedEntry] the record or nil def find_job(jid) Sidekiq.redis do |conn| - conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score| + conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = JSON.parse(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched end end @@ -790,16 +741,12 @@ # The set of scheduled jobs within Sidekiq. # Based on this, you can search/filter for jobs. Here's an # example where I'm selecting jobs based on some complex logic # and deleting them from the scheduled set. # - # r = Sidekiq::ScheduledSet.new - # r.select do |scheduled| - # scheduled.klass == 'Sidekiq::Extensions::DelayedClass' && - # scheduled.args[0] == 'User' && - # scheduled.args[1] == 'setup_new_subscriber' - # end.map(&:delete) + # See the API wiki page for usage notes and examples. + # class ScheduledSet < JobSet def initialize super "schedule" end end @@ -808,16 +755,12 @@ # The set of retries within Sidekiq. # Based on this, you can search/filter for jobs. Here's an # example where I'm selecting all jobs of a certain type # and deleting them from the retry queue. # - # r = Sidekiq::RetrySet.new - # r.select do |retri| - # retri.klass == 'Sidekiq::Extensions::DelayedClass' && - # retri.args[0] == 'User' && - # retri.args[1] == 'setup_new_subscriber' - # end.map(&:delete) + # See the API wiki page for usage notes and examples. + # class RetrySet < JobSet def initialize super "retry" end @@ -847,42 +790,30 @@ def kill(message, opts = {}) now = Time.now.to_f Sidekiq.redis do |conn| conn.multi do |transaction| transaction.zadd(name, now.to_s, message) - transaction.zremrangebyscore(name, "-inf", now - self.class.timeout) - transaction.zremrangebyrank(name, 0, - self.class.max_jobs) + transaction.zremrangebyscore(name, "-inf", now - Sidekiq::Config::DEFAULTS[:dead_timeout_in_seconds]) + transaction.zremrangebyrank(name, 0, - Sidekiq::Config::DEFAULTS[:dead_max_jobs]) end end if opts[:notify_failure] != false job = Sidekiq.load_json(message) r = RuntimeError.new("Job killed by API") r.set_backtrace(caller) - Sidekiq.death_handlers.each do |handle| + Sidekiq.default_configuration.death_handlers.each do |handle| handle.call(job, r) end end true end # Enqueue all dead jobs def retry_all each(&:retry) while size > 0 end - - # The maximum size of the Dead set. Older entries will be trimmed - # to stay within this limit. Default value is 10,000. - def self.max_jobs - Sidekiq[:dead_max_jobs] - end - - # The time limit for entries within the Dead set. Older entries will be thrown away. - # Default value is six months. - def self.timeout - Sidekiq[:dead_timeout_in_seconds] - end end ## # Enumerates the set of Sidekiq processes which are actively working # right now. Each process sends a heartbeat to Redis every 5 seconds @@ -907,11 +838,11 @@ # dont run cleanup more than once per minute return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } count = 0 Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a + procs = conn.sscan("processes").to_a heartbeats = conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "info") end } @@ -927,11 +858,11 @@ count end def each result = Sidekiq.redis { |conn| - procs = conn.sscan_each("processes").to_a.sort + procs = conn.sscan("processes").to_a.sort # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way conn.pipelined do |pipeline| @@ -1019,11 +950,11 @@ def tag self["tag"] end def labels - Array(self["labels"]) + self["labels"].to_a end def [](key) @attribs[key] end @@ -1101,35 +1032,28 @@ class WorkSet include Enumerable def each(&block) results = [] - procs = nil - all_works = nil - Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a.sort - - all_works = conn.pipelined do |pipeline| - procs.each do |key| + procs = conn.sscan("processes").to_a + procs.sort.each do |key| + valid, workers = conn.pipelined { |pipeline| + pipeline.exists(key) pipeline.hgetall("#{key}:work") + } + next unless valid > 0 + workers.each_pair do |tid, json| + hsh = Sidekiq.load_json(json) + p = hsh["payload"] + # avoid breaking API, this is a side effect of the JSON optimization in #4316 + hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String) + results << [key, tid, hsh] end end end - procs.zip(all_works).each do |key, workers| - workers.each_pair do |tid, json| - next if json.empty? - - hsh = Sidekiq.load_json(json) - p = hsh["payload"] - # avoid breaking API, this is a side effect of the JSON optimization in #4316 - hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String) - results << [key, tid, hsh] - end - end - results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block) end # Note that #size is only as accurate as Sidekiq's heartbeat, # which happens every 5 seconds. It is NOT real-time. @@ -1137,10 +1061,10 @@ # Not very efficient if you have lots of Sidekiq # processes but the alternative is a global counter # which can easily get out of sync with crashy processes. def size Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a + procs = conn.sscan("processes").to_a if procs.empty? 0 else conn.pipelined { |pipeline| procs.each do |key|