lib/sidekiq/api.rb in sidekiq-3.3.0 vs lib/sidekiq/api.rb in sidekiq-3.3.1
- old
+ new
@@ -1,18 +1,95 @@
# encoding: utf-8
require 'sidekiq'
module Sidekiq
class Stats
+ def initialize
+ fetch_stats!
+ end
+
def processed
- Sidekiq.redis { |conn| conn.get("stat:processed") }.to_i
+ stat :processed
end
def failed
- Sidekiq.redis { |conn| conn.get("stat:failed") }.to_i
+ stat :failed
end
+ def scheduled_size
+ stat :scheduled_size
+ end
+
+ def retry_size
+ stat :retry_size
+ end
+
+ def dead_size
+ stat :dead_size
+ end
+
+ def enqueued
+ stat :enqueued
+ end
+
+ def processes_size
+ stat :processes_size
+ end
+
+ def workers_size
+ stat :workers_size
+ end
+
+ def default_queue_latency
+ stat :default_queue_latency
+ end
+
+ def fetch_stats!
+ pipe1_res = Sidekiq.redis do |conn|
+ conn.pipelined do
+ conn.get('stat:processed'.freeze)
+ conn.get('stat:failed'.freeze)
+ conn.zcard('schedule'.freeze)
+ conn.zcard('retry'.freeze)
+ conn.zcard('dead'.freeze)
+ conn.scard('processes'.freeze)
+ conn.lrange('queue:default'.freeze, -1, -1)
+ conn.smembers('processes'.freeze)
+ conn.smembers('queues'.freeze)
+ end
+ end
+
+ pipe2_res = Sidekiq.redis do |conn|
+ conn.pipelined do
+ pipe1_res[7].each {|key| conn.hget(key, 'busy'.freeze) }
+ pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
+ end
+ end
+
+ s = pipe1_res[7].size
+ workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
+ enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
+
+ default_queue_latency = if (entry = pipe1_res[6].first)
+ Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'.freeze]
+ else
+ 0
+ end
+ @stats = {
+ processed: pipe1_res[0].to_i,
+ failed: pipe1_res[1].to_i,
+ scheduled_size: pipe1_res[2],
+ retry_size: pipe1_res[3],
+ dead_size: pipe1_res[4],
+ processes_size: pipe1_res[5],
+
+ default_queue_latency: default_queue_latency,
+ workers_size: workers_size,
+ enqueued: enqueued
+ }
+ end
+
def reset(*stats)
all = %w(failed processed)
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
mset_args = []
@@ -23,45 +100,37 @@
Sidekiq.redis do |conn|
conn.mset(*mset_args)
end
end
- def queues
- Sidekiq.redis do |conn|
- queues = conn.smembers('queues')
+ private
- lengths = conn.pipelined do
- queues.each do |queue|
- conn.llen("queue:#{queue}")
- end
- end
-
- i = 0
- array_of_arrays = queues.inject({}) do |memo, queue|
- memo[queue] = lengths[i]
- i += 1
- memo
- end.sort_by { |_, size| size }
-
- Hash[array_of_arrays.reverse]
- end
+ def stat(s)
+ @stats[s]
end
- def enqueued
- queues.values.inject(&:+) || 0
- end
+ class Queues
+ def lengths
+ Sidekiq.redis do |conn|
+ queues = conn.smembers('queues'.freeze)
- def scheduled_size
- Sidekiq.redis {|c| c.zcard('schedule') }
- end
+ lengths = conn.pipelined do
+ queues.each do |queue|
+ conn.llen("queue:#{queue}")
+ end
+ end
- def retry_size
- Sidekiq.redis {|c| c.zcard('retry') }
- end
+ i = 0
+ array_of_arrays = queues.inject({}) do |memo, queue|
+ memo[queue] = lengths[i]
+ i += 1
+ memo
+ end.sort_by { |_, size| size }
- def dead_size
- Sidekiq.redis {|c| c.zcard('dead') }
+ Hash[array_of_arrays.reverse]
+ end
+ end
end
class History
def initialize(days_previous, start_date = nil)
@days_previous = days_previous
@@ -90,12 +159,12 @@
dates << date
i += 1
end
Sidekiq.redis do |conn|
- conn.mget(keys).each_with_index do |value, i|
- stat_hash[dates[i].to_s] = value ? value.to_i : 0
+ conn.mget(keys).each_with_index do |value, idx|
+ stat_hash[dates[idx].to_s] = value ? value.to_i : 0
end
end
stat_hash
end
@@ -116,11 +185,11 @@
#
class Queue
include Enumerable
def self.all
- Sidekiq.redis {|c| c.smembers('queues') }.sort.map {|q| Sidekiq::Queue.new(q) }
+ Sidekiq.redis {|c| c.smembers('queues'.freeze) }.sort.map {|q| Sidekiq::Queue.new(q) }
end
attr_reader :name
def initialize(name="default")
@@ -172,11 +241,11 @@
def clear
Sidekiq.redis do |conn|
conn.multi do
conn.del(@rname)
- conn.srem("queues", name)
+ conn.srem("queues".freeze, name)
end
end
end
alias_method :💣, :clear
end
@@ -265,11 +334,11 @@
private
def safe_load(content, default)
begin
- yield *YAML.load(content)
+ yield(*YAML.load(content))
rescue ::ArgumentError => ex
# #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into
# memory yet so the YAML can't be loaded.
Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development'
default
@@ -380,11 +449,11 @@
@name = name
@_size = size
end
def size
- Sidekiq.redis {|c| c.zcard(name) }
+ Sidekiq.redis { |c| c.zcard(name) }
end
def clear
Sidekiq.redis do |conn|
conn.del(name)
@@ -409,11 +478,11 @@
loop do
range_start = page * page_size + offset_size
range_end = page * page_size + offset_size + (page_size - 1)
elements = Sidekiq.redis do |conn|
- conn.zrange name, range_start, range_end, :with_scores => true
+ conn.zrange name, range_start, range_end, with_scores: true
end
break if elements.empty?
page -= 1
elements.each do |element, score|
block.call SortedEntry.new(self, score, element)
@@ -584,11 +653,11 @@
procs.each do |key|
conn.hmget(key, 'info', 'busy', 'beat')
end
end
- result.each_with_index do |(info, busy, at_s), i|
+ result.each do |info, busy, at_s|
hash = Sidekiq.load_json(info)
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
end
end
@@ -614,10 +683,11 @@
# 'tag' => 'myapp'
# 'concurrency' => 25,
# 'queues' => ['default', 'low'],
# 'busy' => 10,
# 'beat' => <last heartbeat>,
+ # 'identity' => <unique string identifying the process>,
# }
class Process
def initialize(hash)
@attribs = hash
end
@@ -653,10 +723,10 @@
end
end
end
def identity
- @id ||= "#{self['hostname']}:#{self['pid']}"
+ self['identity']
end
end
##
# Programmatic access to the current active worker set.