lib/sidekiq/api.rb in sidekiq-4.0.2 vs lib/sidekiq/api.rb in sidekiq-4.1.0
- old
+ new
@@ -189,10 +189,13 @@
# end
#
class Queue
include Enumerable
+ ##
+ # Return all known queues within Redis.
+ #
def self.all
Sidekiq.redis {|c| c.smembers('queues'.freeze) }.sort.map {|q| Sidekiq::Queue.new(q) }
end
attr_reader :name
@@ -209,10 +212,15 @@
# Sidekiq Pro overrides this
def paused?
false
end
+ ##
+ # Calculates this queue's latency, the difference in seconds since the oldest
+ # job in the queue was enqueued.
+ #
+ # @return Float
def latency
entry = Sidekiq.redis do |conn|
conn.lrange(@rname, -1, -1)
end.first
return 0 unless entry
@@ -225,11 +233,11 @@
page = 0
page_size = 50
while true do
range_start = page * page_size - deleted_size
- range_end = page * page_size - deleted_size + (page_size - 1)
+ range_end = range_start + page_size - 1
entries = Sidekiq.redis do |conn|
conn.lrange @rname, range_start, range_end
end
break if entries.empty?
page += 1
@@ -238,10 +246,15 @@
end
deleted_size = initial_size - size
end
end
+ ##
+ # Find the job with the given JID within this queue.
+ #
+ # This is a slow, inefficient operation. Do not use under
+ # normal conditions. Sidekiq Pro contains a faster version.
def find_job(jid)
detect { |j| j.jid == jid }
end
def clear
@@ -347,11 +360,11 @@
end
count != 0
end
def [](name)
- @item.__send__(:[], name)
+ @item[name]
end
private
def safe_load(content, default)
@@ -500,11 +513,11 @@
page = -1
page_size = 50
while true do
range_start = page * page_size + offset_size
- range_end = page * page_size + offset_size + (page_size - 1)
+ range_end = range_start + page_size - 1
elements = Sidekiq.redis do |conn|
conn.zrange name, range_start, range_end, with_scores: true
end
break if elements.empty?
page -= 1
@@ -529,10 +542,15 @@
end
result
end
end
+ ##
+ # Find the job with the given JID within this sorted set.
+ #
+ # This is a slow, inefficient operation. Do not use under
+ # normal conditions. Sidekiq Pro contains a faster version.
def find_job(jid)
self.detect { |j| j.jid == jid }
end
def delete_by_value(name, value)
@@ -631,11 +649,10 @@
# right now. Each process send a heartbeat to Redis every 5 seconds
# so this set should be relatively accurate, barring network partitions.
#
# Yields a Sidekiq::Process.
#
-
class ProcessSet
include Enumerable
def initialize(clean_plz=true)
self.class.cleanup if clean_plz
@@ -672,17 +689,17 @@
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
result = conn.pipelined do
procs.each do |key|
- conn.hmget(key, 'info', 'busy', 'beat')
+ conn.hmget(key, 'info', 'busy', 'beat', 'quiet')
end
end
- result.each do |info, busy, at_s|
+ result.each do |info, busy, at_s, quiet|
hash = Sidekiq.load_json(info)
- yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
+ yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet))
end
end
nil
end
@@ -695,11 +712,12 @@
Sidekiq.redis { |conn| conn.scard('processes') }
end
end
#
- # Sidekiq::Process has a set of attributes which look like this:
+ # Sidekiq::Process represents an active Sidekiq process talking with Redis.
+ # Each process has a set of attributes which look like this:
#
# {
# 'hostname' => 'app-1.example.com',
# 'started_at' => <process start time>,
# 'pid' => 12345,
@@ -737,10 +755,14 @@
def dump_threads
signal('TTIN')
end
+ def stopping?
+ self['quiet'] == 'true'
+ end
+
private
def signal(sig)
key = "#{identity}-signals"
Sidekiq.redis do |c|
@@ -755,9 +777,10 @@
self['identity']
end
end
##
+ # A worker is a thread that is currently processing a job.
# Programmatic access to the current active worker set.
#
# WARNING WARNING WARNING
#
# This is live data that can change every millisecond.