lib/sidekiq/api.rb in sidekiq-5.2.10 vs lib/sidekiq/api.rb in sidekiq-6.0.0.pre1
- old
+ new
@@ -1,18 +1,18 @@
# frozen_string_literal: true
-require 'sidekiq'
-module Sidekiq
+require "sidekiq"
+module Sidekiq
module RedisScanner
def sscan(conn, key)
- cursor = '0'
+ cursor = "0"
result = []
loop do
cursor, values = conn.sscan(key, cursor)
result.push(*values)
- break if cursor == '0'
+ break if cursor == "0"
end
result
end
end
@@ -62,65 +62,69 @@
def queues
Sidekiq::Stats::Queues.new.lengths
end
def fetch_stats!
- pipe1_res = Sidekiq.redis do |conn|
+ pipe1_res = Sidekiq.redis { |conn|
conn.pipelined do
- conn.get('stat:processed')
- conn.get('stat:failed')
- conn.zcard('schedule')
- conn.zcard('retry')
- conn.zcard('dead')
- conn.scard('processes')
- conn.lrange('queue:default', -1, -1)
+ conn.get("stat:processed")
+ conn.get("stat:failed")
+ conn.zcard("schedule")
+ conn.zcard("retry")
+ conn.zcard("dead")
+ conn.scard("processes")
+ conn.lrange("queue:default", -1, -1)
end
- end
+ }
- processes = Sidekiq.redis do |conn|
- sscan(conn, 'processes')
- end
+ processes = Sidekiq.redis { |conn|
+ sscan(conn, "processes")
+ }
- queues = Sidekiq.redis do |conn|
- sscan(conn, 'queues')
- end
+ queues = Sidekiq.redis { |conn|
+ sscan(conn, "queues")
+ }
- pipe2_res = Sidekiq.redis do |conn|
+ pipe2_res = Sidekiq.redis { |conn|
conn.pipelined do
- processes.each {|key| conn.hget(key, 'busy') }
+ processes.each {|key| conn.hget(key, "busy") }
queues.each {|queue| conn.llen("queue:#{queue}") }
end
- end
+ }
s = processes.size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
default_queue_latency = if (entry = pipe1_res[6].first)
- job = Sidekiq.load_json(entry) rescue {}
- now = Time.now.to_f
- thence = job['enqueued_at'] || now
- now - thence
- else
- 0
- end
+ job = begin
+ Sidekiq.load_json(entry)
+ rescue
+ {}
+ end
+ now = Time.now.to_f
+ thence = job["enqueued_at"] || now
+ now - thence
+ else
+ 0
+ end
@stats = {
- processed: pipe1_res[0].to_i,
- failed: pipe1_res[1].to_i,
- scheduled_size: pipe1_res[2],
- retry_size: pipe1_res[3],
- dead_size: pipe1_res[4],
- processes_size: pipe1_res[5],
+ processed: pipe1_res[0].to_i,
+ failed: pipe1_res[1].to_i,
+ scheduled_size: pipe1_res[2],
+ retry_size: pipe1_res[3],
+ dead_size: pipe1_res[4],
+ processes_size: pipe1_res[5],
default_queue_latency: default_queue_latency,
- workers_size: workers_size,
- enqueued: enqueued
+ workers_size: workers_size,
+ enqueued: enqueued,
}
end
def reset(*stats)
- all = %w(failed processed)
+ all = %w[failed processed]
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
mset_args = []
stats.each do |stat|
mset_args << "stat:#{stat}"
@@ -140,34 +144,31 @@
class Queues
include RedisScanner
def lengths
Sidekiq.redis do |conn|
- queues = sscan(conn, 'queues')
+ queues = sscan(conn, "queues")
- lengths = conn.pipelined do
+ lengths = conn.pipelined {
queues.each do |queue|
conn.llen("queue:#{queue}")
end
- end
+ }
i = 0
- array_of_arrays = queues.inject({}) do |memo, queue|
+ array_of_arrays = queues.each_with_object({}) { |queue, memo|
memo[queue] = lengths[i]
i += 1
- memo
- end.sort_by { |_, size| size }
+ }.sort_by { |_, size| size }
Hash[array_of_arrays.reverse]
end
end
end
class History
def initialize(days_previous, start_date = nil)
- #we only store five years of data in Redis
- raise ArgumentError if days_previous < 1 || days_previous > (5 * 365)
@days_previous = days_previous
@start_date = start_date || Time.now.utc.to_date
end
def processed
@@ -228,16 +229,16 @@
##
# Return all known queues within Redis.
#
def self.all
- Sidekiq.redis { |c| sscan(c, 'queues') }.sort.map { |q| Sidekiq::Queue.new(q) }
+ Sidekiq.redis { |c| sscan(c, "queues") }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
- def initialize(name="default")
+ def initialize(name = "default")
@name = name.to_s
@rname = "queue:#{name}"
end
def size
@@ -253,32 +254,32 @@
# Calculates this queue's latency, the difference in seconds since the oldest
# job in the queue was enqueued.
#
# @return Float
def latency
- entry = Sidekiq.redis do |conn|
+ entry = Sidekiq.redis { |conn|
conn.lrange(@rname, -1, -1)
- end.first
+ }.first
return 0 unless entry
job = Sidekiq.load_json(entry)
now = Time.now.to_f
- thence = job['enqueued_at'] || now
+ thence = job["enqueued_at"] || now
now - thence
end
def each
initial_size = size
deleted_size = 0
page = 0
page_size = 50
- while true do
+ loop do
range_start = page * page_size - deleted_size
range_end = range_start + page_size - 1
- entries = Sidekiq.redis do |conn|
+ entries = Sidekiq.redis { |conn|
conn.lrange @rname, range_start, range_end
- end
+ }
break if entries.empty?
page += 1
entries.each do |entry|
yield Job.new(entry, @name)
end
@@ -315,15 +316,15 @@
#
class Job
attr_reader :item
attr_reader :value
- def initialize(item, queue_name=nil)
+ def initialize(item, queue_name = nil)
@args = nil
@value = item
@item = item.is_a?(Hash) ? item : parse(item)
- @queue = queue_name || @item['queue']
+ @queue = queue_name || @item["queue"]
end
def parse(item)
Sidekiq.load_json(item)
rescue JSON::ParserError
@@ -334,88 +335,86 @@
@args = [item]
{}
end
def klass
- self['class']
+ self["class"]
end
def display_class
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@klass ||= case klass
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], klass) do |target, method, _|
"#{target}.#{method}"
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
- job_class = @item['wrapped'] || args[0]
- if 'ActionMailer::DeliveryJob' == job_class
+ job_class = @item["wrapped"] || args[0]
+ if job_class == "ActionMailer::DeliveryJob"
# MailerClass#mailer_method
- args[0]['arguments'][0..1].join('#')
+ args[0]["arguments"][0..1].join("#")
else
- job_class
+ job_class
end
else
klass
- end
+ end
end
def display_args
# Unwrap known wrappers so they show up in a human-friendly manner in the Web UI
@display_args ||= case klass
when /\ASidekiq::Extensions::Delayed/
safe_load(args[0], args) do |_, _, arg|
arg
end
when "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper"
- job_args = self['wrapped'] ? args[0]["arguments"] : []
- if 'ActionMailer::DeliveryJob' == (self['wrapped'] || args[0])
+ job_args = self["wrapped"] ? args[0]["arguments"] : []
+ if (self["wrapped"] || args[0]) == "ActionMailer::DeliveryJob"
# remove MailerClass, mailer_method and 'deliver_now'
job_args.drop(3)
else
job_args
end
else
- if self['encrypt']
+ if self["encrypt"]
# no point in showing 150+ bytes of random garbage
- args[-1] = '[encrypted data]'
+ args[-1] = "[encrypted data]"
end
args
- end
+ end
end
def args
- @args || @item['args']
+ @args || @item["args"]
end
def jid
- self['jid']
+ self["jid"]
end
def enqueued_at
- self['enqueued_at'] ? Time.at(self['enqueued_at']).utc : nil
+ self["enqueued_at"] ? Time.at(self["enqueued_at"]).utc : nil
end
def created_at
- Time.at(self['created_at'] || self['enqueued_at'] || 0).utc
+ Time.at(self["created_at"] || self["enqueued_at"] || 0).utc
end
- def queue
- @queue
- end
+ attr_reader :queue
def latency
now = Time.now.to_f
- now - (@item['enqueued_at'] || @item['created_at'] || now)
+ now - (@item["enqueued_at"] || @item["created_at"] || now)
end
##
# Remove this job from the queue.
def delete
- count = Sidekiq.redis do |conn|
+ count = Sidekiq.redis { |conn|
conn.lrem("queue:#{@queue}", 1, @value)
- end
+ }
count != 0
end
def [](name)
# nil will happen if the JSON fails to parse.
@@ -425,18 +424,16 @@
end
private
def safe_load(content, default)
- begin
- yield(*YAML.load(content))
- rescue => ex
- # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
- # memory yet so the YAML can't be loaded.
- Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development'
- default
- end
+ yield(*YAML.load(content))
+ rescue => ex
+ # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
+ # memory yet so the YAML can't be loaded.
+ Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == "development"
+ default
end
end
class SortedEntry < Job
attr_reader :score
@@ -473,11 +470,11 @@
end
def retry
remove_job do |message|
msg = Sidekiq.load_json(message)
- msg['retry_count'] -= 1 if msg['retry_count']
+ msg["retry_count"] -= 1 if msg["retry_count"]
Sidekiq::Client.push(msg)
end
end
##
@@ -487,35 +484,35 @@
DeadSet.new.kill(message)
end
end
def error?
- !!item['error_class']
+ !!item["error_class"]
end
private
def remove_job
Sidekiq.redis do |conn|
- results = conn.multi do
+ results = conn.multi {
conn.zrangebyscore(parent.name, score, score)
conn.zremrangebyscore(parent.name, score, score)
- end.first
+ }.first
if results.size == 1
yield results.first
else
# multiple jobs with the same score
# find the one with the right JID and push it
- hash = results.group_by do |message|
+ hash = results.group_by { |message|
if message.index(jid)
msg = Sidekiq.load_json(message)
- msg['jid'] == jid
+ msg["jid"] == jid
else
false
end
- end
+ }
msg = hash.fetch(true, []).first
yield msg if msg
# push the rest back onto the sorted set
@@ -525,11 +522,10 @@
end
end
end
end
end
-
end
class SortedSet
include Enumerable
@@ -551,11 +547,10 @@
end
alias_method :💣, :clear
end
class JobSet < SortedSet
-
def schedule(timestamp, message)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
end
end
@@ -564,48 +559,47 @@
initial_size = @_size
offset_size = 0
page = -1
page_size = 50
- while true do
+ loop do
range_start = page * page_size + offset_size
range_end = range_start + page_size - 1
- elements = Sidekiq.redis do |conn|
+ elements = Sidekiq.redis { |conn|
conn.zrange name, range_start, range_end, with_scores: true
- end
+ }
break if elements.empty?
page -= 1
- elements.reverse.each do |element, score|
+ elements.reverse_each do |element, score|
yield SortedEntry.new(self, score, element)
end
offset_size = initial_size - @_size
end
end
def fetch(score, jid = nil)
- elements = Sidekiq.redis do |conn|
+ elements = Sidekiq.redis { |conn|
conn.zrangebyscore(name, score, score)
- end
+ }
- elements.inject([]) do |result, element|
+ elements.each_with_object([]) do |element, result|
entry = SortedEntry.new(self, score, element)
if jid
result << entry if entry.jid == jid
else
result << entry
end
- result
end
end
##
# Find the job with the given JID within this sorted set.
#
# This is a slow, inefficient operation. Do not use under
# normal conditions. Sidekiq Pro contains a faster version.
def find_job(jid)
- self.detect { |j| j.jid == jid }
+ detect { |j| j.jid == jid }
end
def delete_by_value(name, value)
Sidekiq.redis do |conn|
ret = conn.zrem(name, value)
@@ -622,11 +616,10 @@
if message["jid"] == jid
ret = conn.zrem(name, element)
@_size -= 1 if ret
break ret
end
- false
end
end
end
alias_method :delete, :delete_by_jid
@@ -644,11 +637,11 @@
# scheduled.args[0] == 'User' &&
# scheduled.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class ScheduledSet < JobSet
def initialize
- super 'schedule'
+ super "schedule"
end
end
##
# Allows enumeration of retries within Sidekiq.
@@ -662,40 +655,36 @@
# retri.args[0] == 'User' &&
# retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class RetrySet < JobSet
def initialize
- super 'retry'
+ super "retry"
end
def retry_all
- while size > 0
- each(&:retry)
- end
+ each(&:retry) while size > 0
end
def kill_all
- while size > 0
- each(&:kill)
- end
+ each(&:kill) while size > 0
end
end
##
# Allows enumeration of dead jobs within Sidekiq.
#
class DeadSet < JobSet
def initialize
- super 'dead'
+ super "dead"
end
- def kill(message, opts={})
+ def kill(message, opts = {})
now = Time.now.to_f
Sidekiq.redis do |conn|
conn.multi do
conn.zadd(name, now.to_s, message)
- conn.zremrangebyscore(name, '-inf', now - self.class.timeout)
+ conn.zremrangebyscore(name, "-inf", now - self.class.timeout)
conn.zremrangebyrank(name, 0, - self.class.max_jobs)
end
end
if opts[:notify_failure] != false
@@ -708,13 +697,11 @@
end
true
end
def retry_all
- while size > 0
- each(&:retry)
- end
+ each(&:retry) while size > 0
end
def self.max_jobs
Sidekiq.options[:dead_max_jobs]
end
@@ -733,59 +720,59 @@
#
class ProcessSet
include Enumerable
include RedisScanner
- def initialize(clean_plz=true)
+ def initialize(clean_plz = true)
cleanup if clean_plz
end
# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
def cleanup
count = 0
Sidekiq.redis do |conn|
- procs = sscan(conn, 'processes').sort
- heartbeats = conn.pipelined do
+ procs = sscan(conn, "processes").sort
+ heartbeats = conn.pipelined {
procs.each do |key|
- conn.hget(key, 'info')
+ conn.hget(key, "info")
end
- end
+ }
# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
to_prune = []
heartbeats.each_with_index do |beat, i|
to_prune << procs[i] if beat.nil?
end
- count = conn.srem('processes', to_prune) unless to_prune.empty?
+ count = conn.srem("processes", to_prune) unless to_prune.empty?
end
count
end
def each
- procs = Sidekiq.redis { |conn| sscan(conn, 'processes') }.sort
+ procs = Sidekiq.redis { |conn| sscan(conn, "processes") }.sort
Sidekiq.redis do |conn|
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
- result = conn.pipelined do
+ result = conn.pipelined {
procs.each do |key|
- conn.hmget(key, 'info', 'busy', 'beat', 'quiet')
+ conn.hmget(key, "info", "busy", "beat", "quiet")
end
- end
+ }
result.each do |info, busy, at_s, quiet|
# If a process is stopped between when we query Redis for `procs` and
# when we query for `result`, we will have an item in `result` that is
# composed of `nil` values.
next if info.nil?
hash = Sidekiq.load_json(info)
- yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet))
+ yield Process.new(hash.merge("busy" => busy.to_i, "beat" => at_s.to_f, "quiet" => quiet))
end
end
nil
end
@@ -793,21 +780,21 @@
# This method is not guaranteed accurate since it does not prune the set
# based on current heartbeat. #each does that and ensures the set only
# contains Sidekiq processes which have sent a heartbeat within the last
# 60 seconds.
def size
- Sidekiq.redis { |conn| conn.scard('processes') }
+ Sidekiq.redis { |conn| conn.scard("processes") }
end
# Returns the identity of the current cluster leader or "" if no leader.
# This is a Sidekiq Enterprise feature, will always return "" in Sidekiq
# or Sidekiq Pro.
def leader
@leader ||= begin
x = Sidekiq.redis {|c| c.get("dear-leader") }
# need a non-falsy value so we can memoize
- x = "" unless x
+ x ||= ""
x
end
end
end
@@ -830,39 +817,39 @@
def initialize(hash)
@attribs = hash
end
def tag
- self['tag']
+ self["tag"]
end
def labels
- Array(self['labels'])
+ Array(self["labels"])
end
def [](key)
@attribs[key]
end
def identity
- self['identity']
+ self["identity"]
end
def quiet!
- signal('TSTP')
+ signal("TSTP")
end
def stop!
- signal('TERM')
+ signal("TERM")
end
def dump_threads
- signal('TTIN')
+ signal("TTIN")
end
def stopping?
- self['quiet'] == 'true'
+ self["quiet"] == "true"
end
private
def signal(sig)
@@ -872,11 +859,10 @@
c.lpush(key, sig)
c.expire(key, 60)
end
end
end
-
end
##
# A worker is a thread that is currently processing a job.
# Programmatic access to the current active worker set.
@@ -901,16 +887,16 @@
include Enumerable
include RedisScanner
def each
Sidekiq.redis do |conn|
- procs = sscan(conn, 'processes')
+ procs = sscan(conn, "processes")
procs.sort.each do |key|
- valid, workers = conn.pipelined do
- conn.exists?(key)
+ valid, workers = conn.pipelined {
+ conn.exists(key)
conn.hgetall("#{key}:workers")
- end
+ }
next unless valid
workers.each_pair do |tid, json|
yield key, tid, Sidekiq.load_json(json)
end
end
@@ -923,20 +909,19 @@
# Not very efficient if you have lots of Sidekiq
# processes but the alternative is a global counter
# which can easily get out of sync with crashy processes.
def size
Sidekiq.redis do |conn|
- procs = sscan(conn, 'processes')
+ procs = sscan(conn, "processes")
if procs.empty?
0
else
- conn.pipelined do
+ conn.pipelined {
procs.each do |key|
- conn.hget(key, 'busy')
+ conn.hget(key, "busy")
end
- end.map(&:to_i).inject(:+)
+ }.map(&:to_i).inject(:+)
end
end
end
end
-
end