lib/sidekiq/api.rb in sidekiq-5.0.0 vs lib/sidekiq/api.rb in sidekiq-5.0.1
- old
+ new
@@ -144,15 +144,15 @@
@days_previous = days_previous
@start_date = start_date || Time.now.utc.to_date
end
def processed
- date_stat_hash("processed")
+ @processed ||= date_stat_hash("processed")
end
def failed
- date_stat_hash("failed")
+ @failed ||= date_stat_hash("failed")
end
private
def date_stat_hash(stat)
@@ -167,14 +167,19 @@
keys << "stat:#{stat}:#{datestr}"
dates << datestr
i += 1
end
- Sidekiq.redis do |conn|
- conn.mget(keys).each_with_index do |value, idx|
- stat_hash[dates[idx]] = value ? value.to_i : 0
+ begin
+ Sidekiq.redis do |conn|
+ conn.mget(keys).each_with_index do |value, idx|
+ stat_hash[dates[idx]] = value ? value.to_i : 0
+ end
end
+ rescue Redis::CommandError
+ # mget will trigger a CROSSSLOT error when run against a Cluster
+ # TODO Someone want to add Cluster support?
end
stat_hash
end
end
@@ -723,10 +728,15 @@
conn.hmget(key, 'info', 'busy', 'beat', 'quiet')
end
end
result.each do |info, busy, at_s, quiet|
+ # If a process is stopped between when we query Redis for `procs` and
+ # when we query for `result`, we will have an item in `result` that is
+ # composed of `nil` values.
+ next if info.nil?
+
hash = Sidekiq.load_json(info)
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet))
end
end
@@ -738,10 +748,22 @@
# contains Sidekiq processes which have sent a heartbeat within the last
# 60 seconds.
def size
Sidekiq.redis { |conn| conn.scard('processes') }
end
+
+ # Returns the identity of the current cluster leader or "" if no leader.
+ # This is a Sidekiq Enterprise feature, will always return "" in Sidekiq
+ # or Sidekiq Pro.
+ def leader
+ @leader ||= begin
+ x = Sidekiq.redis {|c| c.get("dear-leader") }
+ # need a non-falsy value so we can memoize
+ x = "" unless x
+ x
+ end
+ end
end
#
# Sidekiq::Process represents an active Sidekiq process talking with Redis.
# Each process has a set of attributes which look like this:
@@ -772,10 +794,14 @@
def [](key)
@attribs[key]
end
+ def identity
+ self['identity']
+ end
+
def quiet!
signal('TSTP')
end
def stop!
@@ -800,12 +826,9 @@
c.expire(key, 60)
end
end
end
- def identity
- self['identity']
- end
end
##
# A worker is a thread that is currently processing a job.
# Programmatic access to the current active worker set.