lib/sidekiq/api.rb in sidekiq-5.0.0 vs lib/sidekiq/api.rb in sidekiq-5.0.1

- old
+ new

@@ -144,15 +144,15 @@ @days_previous = days_previous @start_date = start_date || Time.now.utc.to_date end def processed - date_stat_hash("processed") + @processed ||= date_stat_hash("processed") end def failed - date_stat_hash("failed") + @failed ||= date_stat_hash("failed") end private def date_stat_hash(stat) @@ -167,14 +167,19 @@ keys << "stat:#{stat}:#{datestr}" dates << datestr i += 1 end - Sidekiq.redis do |conn| - conn.mget(keys).each_with_index do |value, idx| - stat_hash[dates[idx]] = value ? value.to_i : 0 + begin + Sidekiq.redis do |conn| + conn.mget(keys).each_with_index do |value, idx| + stat_hash[dates[idx]] = value ? value.to_i : 0 + end end + rescue Redis::CommandError + # mget will trigger a CROSSSLOT error when run against a Cluster + # TODO Someone want to add Cluster support? end stat_hash end end @@ -723,10 +728,15 @@ conn.hmget(key, 'info', 'busy', 'beat', 'quiet') end end result.each do |info, busy, at_s, quiet| + # If a process is stopped between when we query Redis for `procs` and + # when we query for `result`, we will have an item in `result` that is + # composed of `nil` values. + next if info.nil? + hash = Sidekiq.load_json(info) yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet)) end end @@ -738,10 +748,22 @@ # contains Sidekiq processes which have sent a heartbeat within the last # 60 seconds. def size Sidekiq.redis { |conn| conn.scard('processes') } end + + # Returns the identity of the current cluster leader or "" if no leader. + # This is a Sidekiq Enterprise feature, will always return "" in Sidekiq + # or Sidekiq Pro. + def leader + @leader ||= begin + x = Sidekiq.redis {|c| c.get("dear-leader") } + # need a non-falsy value so we can memoize + x = "" unless x + x + end + end end # # Sidekiq::Process represents an active Sidekiq process talking with Redis. # Each process has a set of attributes which look like this: @@ -772,10 +794,14 @@ def [](key) @attribs[key] end + def identity + self['identity'] + end + def quiet! signal('TSTP') end def stop! @@ -800,12 +826,9 @@ c.expire(key, 60) end end end - def identity - self['identity'] - end end ## # A worker is a thread that is currently processing a job. # Programmatic access to the current active worker set.