lib/sidekiq/api.rb in sidekiq-4.0.2 vs lib/sidekiq/api.rb in sidekiq-4.1.0

- old
+ new

@@ -189,10 +189,13 @@ # end # class Queue include Enumerable + ## + # Return all known queues within Redis. + # def self.all Sidekiq.redis {|c| c.smembers('queues'.freeze) }.sort.map {|q| Sidekiq::Queue.new(q) } end attr_reader :name @@ -209,10 +212,15 @@ # Sidekiq Pro overrides this def paused? false end + ## + # Calculates this queue's latency, the difference in seconds since the oldest + # job in the queue was enqueued. + # + # @return Float def latency entry = Sidekiq.redis do |conn| conn.lrange(@rname, -1, -1) end.first return 0 unless entry @@ -225,11 +233,11 @@ page = 0 page_size = 50 while true do range_start = page * page_size - deleted_size - range_end = page * page_size - deleted_size + (page_size - 1) + range_end = range_start + page_size - 1 entries = Sidekiq.redis do |conn| conn.lrange @rname, range_start, range_end end break if entries.empty? page += 1 @@ -238,10 +246,15 @@ end deleted_size = initial_size - size end end + ## + # Find the job with the given JID within this queue. + # + # This is a slow, inefficient operation. Do not use under + # normal conditions. Sidekiq Pro contains a faster version. def find_job(jid) detect { |j| j.jid == jid } end def clear @@ -347,11 +360,11 @@ end count != 0 end def [](name) - @item.__send__(:[], name) + @item[name] end private def safe_load(content, default) @@ -500,11 +513,11 @@ page = -1 page_size = 50 while true do range_start = page * page_size + offset_size - range_end = page * page_size + offset_size + (page_size - 1) + range_end = range_start + page_size - 1 elements = Sidekiq.redis do |conn| conn.zrange name, range_start, range_end, with_scores: true end break if elements.empty? page -= 1 @@ -529,10 +542,15 @@ end result end end + ## + # Find the job with the given JID within this sorted set. + # + # This is a slow, inefficient operation. Do not use under + # normal conditions. Sidekiq Pro contains a faster version. def find_job(jid) self.detect { |j| j.jid == jid } end def delete_by_value(name, value) @@ -631,11 +649,10 @@ # right now. Each process send a heartbeat to Redis every 5 seconds # so this set should be relatively accurate, barring network partitions. # # Yields a Sidekiq::Process. # - class ProcessSet include Enumerable def initialize(clean_plz=true) self.class.cleanup if clean_plz @@ -672,17 +689,17 @@ # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way result = conn.pipelined do procs.each do |key| - conn.hmget(key, 'info', 'busy', 'beat') + conn.hmget(key, 'info', 'busy', 'beat', 'quiet') end end - result.each do |info, busy, at_s| + result.each do |info, busy, at_s, quiet| hash = Sidekiq.load_json(info) - yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)) + yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f, 'quiet' => quiet)) end end nil end @@ -695,11 +712,12 @@ Sidekiq.redis { |conn| conn.scard('processes') } end end # - # Sidekiq::Process has a set of attributes which look like this: + # Sidekiq::Process represents an active Sidekiq process talking with Redis. + # Each process has a set of attributes which look like this: # # { # 'hostname' => 'app-1.example.com', # 'started_at' => <process start time>, # 'pid' => 12345, @@ -737,10 +755,14 @@ def dump_threads signal('TTIN') end + def stopping? + self['quiet'] == 'true' + end + private def signal(sig) key = "#{identity}-signals" Sidekiq.redis do |c| @@ -755,9 +777,10 @@ self['identity'] end end ## + # A worker is a thread that is currently processing a job. # Programmatic access to the current active worker set. # # WARNING WARNING WARNING # # This is live data that can change every millisecond.