lib/sidekiq/api.rb in sidekiq-3.3.0 vs lib/sidekiq/api.rb in sidekiq-3.3.1

- old
+ new

@@ -1,18 +1,95 @@ # encoding: utf-8 require 'sidekiq' module Sidekiq class Stats + def initialize + fetch_stats! + end + def processed - Sidekiq.redis { |conn| conn.get("stat:processed") }.to_i + stat :processed end def failed - Sidekiq.redis { |conn| conn.get("stat:failed") }.to_i + stat :failed end + def scheduled_size + stat :scheduled_size + end + + def retry_size + stat :retry_size + end + + def dead_size + stat :dead_size + end + + def enqueued + stat :enqueued + end + + def processes_size + stat :processes_size + end + + def workers_size + stat :workers_size + end + + def default_queue_latency + stat :default_queue_latency + end + + def fetch_stats! + pipe1_res = Sidekiq.redis do |conn| + conn.pipelined do + conn.get('stat:processed'.freeze) + conn.get('stat:failed'.freeze) + conn.zcard('schedule'.freeze) + conn.zcard('retry'.freeze) + conn.zcard('dead'.freeze) + conn.scard('processes'.freeze) + conn.lrange('queue:default'.freeze, -1, -1) + conn.smembers('processes'.freeze) + conn.smembers('queues'.freeze) + end + end + + pipe2_res = Sidekiq.redis do |conn| + conn.pipelined do + pipe1_res[7].each {|key| conn.hget(key, 'busy'.freeze) } + pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") } + end + end + + s = pipe1_res[7].size + workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+) + enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+) + + default_queue_latency = if (entry = pipe1_res[6].first) + Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at'.freeze] + else + 0 + end + @stats = { + processed: pipe1_res[0].to_i, + failed: pipe1_res[1].to_i, + scheduled_size: pipe1_res[2], + retry_size: pipe1_res[3], + dead_size: pipe1_res[4], + processes_size: pipe1_res[5], + + default_queue_latency: default_queue_latency, + workers_size: workers_size, + enqueued: enqueued + } + end + def reset(*stats) all = %w(failed processed) stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s) mset_args = [] @@ -23,45 +100,37 @@ Sidekiq.redis do |conn| conn.mset(*mset_args) end end - def queues - Sidekiq.redis do |conn| - queues = conn.smembers('queues') + private - lengths = conn.pipelined do - queues.each do |queue| - conn.llen("queue:#{queue}") - end - end - - i = 0 - array_of_arrays = queues.inject({}) do |memo, queue| - memo[queue] = lengths[i] - i += 1 - memo - end.sort_by { |_, size| size } - - Hash[array_of_arrays.reverse] - end + def stat(s) + @stats[s] end - def enqueued - queues.values.inject(&:+) || 0 - end + class Queues + def lengths + Sidekiq.redis do |conn| + queues = conn.smembers('queues'.freeze) - def scheduled_size - Sidekiq.redis {|c| c.zcard('schedule') } - end + lengths = conn.pipelined do + queues.each do |queue| + conn.llen("queue:#{queue}") + end + end - def retry_size - Sidekiq.redis {|c| c.zcard('retry') } - end + i = 0 + array_of_arrays = queues.inject({}) do |memo, queue| + memo[queue] = lengths[i] + i += 1 + memo + end.sort_by { |_, size| size } - def dead_size - Sidekiq.redis {|c| c.zcard('dead') } + Hash[array_of_arrays.reverse] + end + end end class History def initialize(days_previous, start_date = nil) @days_previous = days_previous @@ -90,12 +159,12 @@ dates << date i += 1 end Sidekiq.redis do |conn| - conn.mget(keys).each_with_index do |value, i| - stat_hash[dates[i].to_s] = value ? value.to_i : 0 + conn.mget(keys).each_with_index do |value, idx| + stat_hash[dates[idx].to_s] = value ? value.to_i : 0 end end stat_hash end @@ -116,11 +185,11 @@ # class Queue include Enumerable def self.all - Sidekiq.redis {|c| c.smembers('queues') }.sort.map {|q| Sidekiq::Queue.new(q) } + Sidekiq.redis {|c| c.smembers('queues'.freeze) }.sort.map {|q| Sidekiq::Queue.new(q) } end attr_reader :name def initialize(name="default") @@ -172,11 +241,11 @@ def clear Sidekiq.redis do |conn| conn.multi do conn.del(@rname) - conn.srem("queues", name) + conn.srem("queues".freeze, name) end end end alias_method :💣, :clear end @@ -265,11 +334,11 @@ private def safe_load(content, default) begin - yield *YAML.load(content) + yield(*YAML.load(content)) rescue ::ArgumentError => ex # #1761 in dev mode, it's possible to have jobs enqueued which haven't been loaded into # memory yet so the YAML can't be loaded. Sidekiq.logger.warn "Unable to load YAML: #{ex.message}" unless Sidekiq.options[:environment] == 'development' default @@ -380,11 +449,11 @@ @name = name @_size = size end def size - Sidekiq.redis {|c| c.zcard(name) } + Sidekiq.redis { |c| c.zcard(name) } end def clear Sidekiq.redis do |conn| conn.del(name) @@ -409,11 +478,11 @@ loop do range_start = page * page_size + offset_size range_end = page * page_size + offset_size + (page_size - 1) elements = Sidekiq.redis do |conn| - conn.zrange name, range_start, range_end, :with_scores => true + conn.zrange name, range_start, range_end, with_scores: true end break if elements.empty? page -= 1 elements.each do |element, score| block.call SortedEntry.new(self, score, element) @@ -584,11 +653,11 @@ procs.each do |key| conn.hmget(key, 'info', 'busy', 'beat') end end - result.each_with_index do |(info, busy, at_s), i| + result.each do |info, busy, at_s| hash = Sidekiq.load_json(info) yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)) end end @@ -614,10 +683,11 @@ # 'tag' => 'myapp' # 'concurrency' => 25, # 'queues' => ['default', 'low'], # 'busy' => 10, # 'beat' => <last heartbeat>, + # 'identity' => <unique string identifying the process>, # } class Process def initialize(hash) @attribs = hash end @@ -653,10 +723,10 @@ end end end def identity - @id ||= "#{self['hostname']}:#{self['pid']}" + self['identity'] end end ## # Programmatic access to the current active worker set.