lib/sidekiq/api.rb in sidekiq-5.2.10 vs lib/sidekiq/api.rb in sidekiq-6.0.0.pre1

- old
+ new

@@ -1,18 +1,18 @@ # frozen_string_literal: true -require 'sidekiq' -module Sidekiq +require "sidekiq" +module Sidekiq module RedisScanner def sscan(conn, key) - cursor = '0' + cursor = "0" result = [] loop do cursor, values = conn.sscan(key, cursor) result.push(*values) - break if cursor == '0' + break if cursor == "0" end result end end @@ -62,65 +62,69 @@ def queues Sidekiq::Stats::Queues.new.lengths end def fetch_stats! - pipe1_res = Sidekiq.redis do |conn| + pipe1_res = Sidekiq.redis { |conn| conn.pipelined do - conn.get('stat:processed') - conn.get('stat:failed') - conn.zcard('schedule') - conn.zcard('retry') - conn.zcard('dead') - conn.scard('processes') - conn.lrange('queue:default', -1, -1) + conn.get("stat:processed") + conn.get("stat:failed") + conn.zcard("schedule") + conn.zcard("retry") + conn.zcard("dead") + conn.scard("processes") + conn.lrange("queue:default", -1, -1) end - end + } - processes = Sidekiq.redis do |conn| - sscan(conn, 'processes') - end + processes = Sidekiq.redis { |conn| + sscan(conn, "processes") + } - queues = Sidekiq.redis do |conn| - sscan(conn, 'queues') - end + queues = Sidekiq.redis { |conn| + sscan(conn, "queues") + } - pipe2_res = Sidekiq.redis do |conn| + pipe2_res = Sidekiq.redis { |conn| conn.pipelined do - processes.each {|key| conn.hget(key, 'busy') } + processes.each {|key| conn.hget(key, "busy") } queues.each {|queue| conn.llen("queue:#{queue}") } end - end + } s = processes.size workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+) enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+) default_queue_latency = if (entry = pipe1_res[6].first) - job = Sidekiq.load_json(entry) rescue {} - now = Time.now.to_f - thence = job['enqueued_at'] || now - now - thence - else - 0 - end + job = begin + Sidekiq.load_json(entry) + rescue + {} + end + now = Time.now.to_f + thence = job["enqueued_at"] || now + now - thence + else + 0 + end @stats = { - processed: pipe1_res[0].to_i, - failed: pipe1_res[1].to_i, - scheduled_size: pipe1_res[2], - retry_size: pipe1_res[3], - dead_size: pipe1_res[4], - processes_size: pipe1_res[5], + processed: pipe1_res[0].to_i, + failed: pipe1_res[1].to_i, + scheduled_size: pipe1_res[2], + retry_size: pipe1_res[3], + dead_size: pipe1_res[4], + processes_size: pipe1_res[5], default_queue_latency: default_queue_latency, - workers_size: workers_size, - enqueued: enqueued + workers_size: workers_size, + enqueued: enqueued, } end def reset(*stats) - all = %w(failed processed) + all = %w[failed processed] stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s) mset_args = [] stats.each do |stat| mset_args << "stat:#{stat}" @@ -140,34 +144,31 @@ class Queues include RedisScanner def lengths Sidekiq.redis do |conn| - queues = sscan(conn, 'queues') + queues = sscan(conn, "queues") - lengths = conn.pipelined do + lengths = conn.pipelined { queues.each do |queue| conn.llen("queue:#{queue}") end - end + } i = 0 - array_of_arrays = queues.inject({}) do |memo, queue| + array_of_arrays = queues.each_with_object({}) { |queue, memo| memo[queue] = lengths[i] i += 1 - memo - end.sort_by { |_, size| size } + }.sort_by { |_, size| size } Hash[array_of_arrays.reverse] end end end class History def initialize(days_previous, start_date = nil) - #we only store five years of data in Redis - raise ArgumentError if days_previous < 1 || days_previous > (5 * 365) @days_previous = days_previous @start_date = start_date || Time.now.utc.to_date end def processed @@ -228,16 +229,16 @@ ## # Return all known queues within Redis. # def self.all - Sidekiq.redis { |c| sscan(c, 'queues') }.sort.map { |q| Sidekiq::Queue.new(q) } + Sidekiq.redis { |c| sscan(c, "queues") }.sort.map { |q| Sidekiq::Queue.new(q) } end attr_reader :name - def initialize(name="default") + def initialize(name = "default") @name = name.to_s @rname = "queue:#{name}" end def size @@ -253,32 +254,32 @@ # Calculates this queue's latency, the difference in seconds since the oldest # job in the queue was enqueued. # # @return Float def latency - entry = Sidekiq.redis do |conn| + entry = Sidekiq.redis { |conn| conn.lrange(@rname, -1, -1) - end.first + }.first return 0 unless entry job = Sidekiq.load_json(entry) now = Time.now.to_f - thence = job['enqueued_at'] || now + thence = job["enqueued_at"] || now now - thence end def each initial_size = size deleted_size = 0 page = 0 page_size = 50 - while true do + loop do range_start = page * page_size - deleted_size range_end = range_start + page_size - 1 - entries = Sidekiq.redis do |conn| + entries = Sidekiq.redis { |conn| conn.lrange @rname, range_start, range_end - end + } break if entries.empty? page += 1 entries.each do |entry| yield Job.new(entry, @name) end @@ -315,15 +316,15 @@ # class Job attr_reader :item attr_reader :value - def initialize(item, queue_name=nil) + def initialize(item, queue_name = nil) @args = nil @value = item @item = item.is_a?(Hash) ? item : parse(item) - @queue = queue_name || @item['queue'] + @queue = queue_name || @item["queue"] end def parse(item) Sidekiq.load_json(item) rescue JSON::ParserError @@ -334,88 +335,86 @@ @args = [item] {} end def klass - self['class'] + self["class"] end def display_class # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI @klass ||= case klass when /\ASidekiq::Extensions::Delayed/ safe_load(args[0], klass) do |target, method, _| "#{target}.#{method}" end when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" - job_class = @item['wrapped'] || args[0] - if 'ActionMailer::DeliveryJob' == job_class + job_class = @item["wrapped"] || args[0] + if job_class == "ActionMailer::DeliveryJob" # MailerClass#mailer_method - args[0]['arguments'][0..1].join('#') + args[0]["arguments"][0..1].join("#") else - job_class + job_class end else klass - end + end end def display_args # Unwrap known wrappers so they show up in a human-friendly manner in the Web UI @display_args ||= case klass when /\ASidekiq::Extensions::Delayed/ safe_load(args[0], args) do |_, _, arg| arg end when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper" - job_args = self['wrapped'] ? args[0]["arguments"] : [] - if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0]) + job_args = self["wrapped"] ? args[0]["arguments"] : [] + if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob" # remove MailerClass, mailer_method and 'deliver_now' job_args.drop(3) else job_args end else - if self['encrypt'] + if self["encrypt"] # no point in showing 150+ bytes of random garbage - args[-1] = '[encrypted data]' + args[-1] = "[encrypted data]" end args - end + end end def args - @args || @item['args'] + @args || @item["args"] end def jid - self['jid'] + self["jid"] end def enqueued_at - self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil + self["enqueued_at"] ? Time.at(self["enqueued_at"]).utc : nil end def created_at - Time.at(self['created_at'] || self['enqueued_at'] || 0).utc + Time.at(self["created_at"] || self["enqueued_at"] || 0).utc end - def queue - @queue - end + attr_reader :queue def latency now = Time.now.to_f - now - (@item['enqueued_at'] || @item['created_at'] || now) + now - (@item["enqueued_at"] || @item["created_at"] || now) end ## # Remove this job from the queue. def delete - count = Sidekiq.redis do |conn| + count = Sidekiq.redis { |conn| conn.lrem("queue:#{@queue}", 1, @value) - end + } count != 0 end def [](name) # nil will happen if the JSON fails to parse. @@ -425,18 +424,16 @@ end private def safe_load(content, default) - begin - yield(*YAML.load(content)) - rescue => ex - # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into - # memory yet so the YAML can't be loaded. - Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development' - default - end + yield(*YAML.load(content)) + rescue => ex + # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into + # memory yet so the YAML can't be loaded. + Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == "development" + default end end class SortedEntry < Job attr_reader :score @@ -473,11 +470,11 @@ end def retry remove_job do |message| msg = Sidekiq.load_json(message) - msg['retry_count'] -= 1 if msg['retry_count'] + msg["retry_count"] -= 1 if msg["retry_count"] Sidekiq::Client.push(msg) end end ## @@ -487,35 +484,35 @@ DeadSet.new.kill(message) end end def error? - !!item['error_class'] + !!item["error_class"] end private def remove_job Sidekiq.redis do |conn| - results = conn.multi do + results = conn.multi { conn.zrangebyscore(parent.name, score, score) conn.zremrangebyscore(parent.name, score, score) - end.first + }.first if results.size == 1 yield results.first else # multiple jobs with the same score # find the one with the right JID and push it - hash = results.group_by do |message| + hash = results.group_by { |message| if message.index(jid) msg = Sidekiq.load_json(message) - msg['jid'] == jid + msg["jid"] == jid else false end - end + } msg = hash.fetch(true, []).first yield msg if msg # push the rest back onto the sorted set @@ -525,11 +522,10 @@ end end end end end - end class SortedSet include Enumerable @@ -551,11 +547,10 @@ end alias_method :💣, :clear end class JobSet < SortedSet - def schedule(timestamp, message) Sidekiq.redis do |conn| conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message)) end end @@ -564,48 +559,47 @@ initial_size = @_size offset_size = 0 page = -1 page_size = 50 - while true do + loop do range_start = page * page_size + offset_size range_end = range_start + page_size - 1 - elements = Sidekiq.redis do |conn| + elements = Sidekiq.redis { |conn| conn.zrange name, range_start, range_end, with_scores: true - end + } break if elements.empty? page -= 1 - elements.reverse.each do |element, score| + elements.reverse_each do |element, score| yield SortedEntry.new(self, score, element) end offset_size = initial_size - @_size end end def fetch(score, jid = nil) - elements = Sidekiq.redis do |conn| + elements = Sidekiq.redis { |conn| conn.zrangebyscore(name, score, score) - end + } - elements.inject([]) do |result, element| + elements.each_with_object([]) do |element, result| entry = SortedEntry.new(self, score, element) if jid result << entry if entry.jid == jid else result << entry end - result end end ## # Find the job with the given JID within this sorted set. # # This is a slow, inefficient operation. Do not use under # normal conditions. Sidekiq Pro contains a faster version. def find_job(jid) - self.detect { |j| j.jid == jid } + detect { |j| j.jid == jid } end def delete_by_value(name, value) Sidekiq.redis do |conn| ret = conn.zrem(name, value) @@ -622,11 +616,10 @@ if message["jid"] == jid ret = conn.zrem(name, element) @_size -= 1 if ret break ret end - false end end end alias_method :delete, :delete_by_jid @@ -644,11 +637,11 @@ # scheduled.args[0] == 'User' && # scheduled.args[1] == 'setup_new_subscriber' # end.map(&:delete) class ScheduledSet < JobSet def initialize - super 'schedule' + super "schedule" end end ## # Allows enumeration of retries within Sidekiq. @@ -662,40 +655,36 @@ # retri.args[0] == 'User' && # retri.args[1] == 'setup_new_subscriber' # end.map(&:delete) class RetrySet < JobSet def initialize - super 'retry' + super "retry" end def retry_all - while size > 0 - each(&:retry) - end + each(&:retry) while size > 0 end def kill_all - while size > 0 - each(&:kill) - end + each(&:kill) while size > 0 end end ## # Allows enumeration of dead jobs within Sidekiq. # class DeadSet < JobSet def initialize - super 'dead' + super "dead" end - def kill(message, opts={}) + def kill(message, opts = {}) now = Time.now.to_f Sidekiq.redis do |conn| conn.multi do conn.zadd(name, now.to_s, message) - conn.zremrangebyscore(name, '-inf', now - self.class.timeout) + conn.zremrangebyscore(name, "-inf", now - self.class.timeout) conn.zremrangebyrank(name, 0, - self.class.max_jobs) end end if opts[:notify_failure] != false @@ -708,13 +697,11 @@ end true end def retry_all - while size > 0 - each(&:retry) - end + each(&:retry) while size > 0 end def self.max_jobs Sidekiq.options[:dead_max_jobs] end @@ -733,59 +720,59 @@ # class ProcessSet include Enumerable include RedisScanner - def initialize(clean_plz=true) + def initialize(clean_plz = true) cleanup if clean_plz end # Cleans up dead processes recorded in Redis. # Returns the number of processes cleaned. def cleanup count = 0 Sidekiq.redis do |conn| - procs = sscan(conn, 'processes').sort - heartbeats = conn.pipelined do + procs = sscan(conn, "processes").sort + heartbeats = conn.pipelined { procs.each do |key| - conn.hget(key, 'info') + conn.hget(key, "info") end - end + } # the hash named key has an expiry of 60 seconds. # if it's not found, that means the process has not reported # in to Redis and probably died. to_prune = [] heartbeats.each_with_index do |beat, i| to_prune << procs[i] if beat.nil? end - count = conn.srem('processes', to_prune) unless to_prune.empty? + count = conn.srem("processes", to_prune) unless to_prune.empty? end count end def each - procs = Sidekiq.redis { |conn| sscan(conn, 'processes') }.sort + procs = Sidekiq.redis { |conn| sscan(conn, "processes") }.sort Sidekiq.redis do |conn| # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way - result = conn.pipelined do + result = conn.pipelined { procs.each do |key| - conn.hmget(key, 'info', 'busy', 'beat', 'quiet') + conn.hmget(key, "info", "busy", "beat", "quiet") end - end + } result.each do |info, busy, at_s, quiet| # If a process is stopped between when we query Redis for `procs` and # when we query for `result`, we will have an item in `result` that is # composed of `nil` values. next if info.nil? hash = Sidekiq.load_json(info) - yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet)) + yield Process.new(hash.merge("busy" => busy.to_i, "beat" => at_s.to_f, "quiet" => quiet)) end end nil end @@ -793,21 +780,21 @@ # This method is not guaranteed accurate since it does not prune the set # based on current heartbeat. #each does that and ensures the set only # contains Sidekiq processes which have sent a heartbeat within the last # 60 seconds. def size - Sidekiq.redis { |conn| conn.scard('processes') } + Sidekiq.redis { |conn| conn.scard("processes") } end # Returns the identity of the current cluster leader or "" if no leader. # This is a Sidekiq Enterprise feature, will always return "" in Sidekiq # or Sidekiq Pro. def leader @leader ||= begin x = Sidekiq.redis {|c| c.get("dear-leader") } # need a non-falsy value so we can memoize - x = "" unless x + x ||= "" x end end end @@ -830,39 +817,39 @@ def initialize(hash) @attribs = hash end def tag - self['tag'] + self["tag"] end def labels - Array(self['labels']) + Array(self["labels"]) end def [](key) @attribs[key] end def identity - self['identity'] + self["identity"] end def quiet! - signal('TSTP') + signal("TSTP") end def stop! - signal('TERM') + signal("TERM") end def dump_threads - signal('TTIN') + signal("TTIN") end def stopping? - self['quiet'] == 'true' + self["quiet"] == "true" end private def signal(sig) @@ -872,11 +859,10 @@ c.lpush(key, sig) c.expire(key, 60) end end end - end ## # A worker is a thread that is currently processing a job. # Programmatic access to the current active worker set. @@ -901,16 +887,16 @@ include Enumerable include RedisScanner def each Sidekiq.redis do |conn| - procs = sscan(conn, 'processes') + procs = sscan(conn, "processes") procs.sort.each do |key| - valid, workers = conn.pipelined do - conn.exists?(key) + valid, workers = conn.pipelined { + conn.exists(key) conn.hgetall("#{key}:workers") - end + } next unless valid workers.each_pair do |tid, json| yield key, tid, Sidekiq.load_json(json) end end @@ -923,20 +909,19 @@ # Not very efficient if you have lots of Sidekiq # processes but the alternative is a global counter # which can easily get out of sync with crashy processes. def size Sidekiq.redis do |conn| - procs = sscan(conn, 'processes') + procs = sscan(conn, "processes") if procs.empty? 0 else - conn.pipelined do + conn.pipelined { procs.each do |key| - conn.hget(key, 'busy') + conn.hget(key, "busy") end - end.map(&:to_i).inject(:+) + }.map(&:to_i).inject(:+) end end end end - end