lib/sidekiq/api.rb in sidekiq-5.1.3 vs lib/sidekiq/api.rb in sidekiq-5.2.0

- old
+ new

@@ -1,10 +1,26 @@ # frozen_string_literal: true require 'sidekiq' module Sidekiq + + module RedisScanner + def sscan(conn, key) + cursor = '0' + result = [] + loop do + cursor, values = conn.sscan(key, cursor) + result.push(*values) + break if cursor == '0' + end + result + end + end + class Stats + include RedisScanner + def initialize fetch_stats! end def processed @@ -55,23 +71,29 @@ conn.zcard('schedule') conn.zcard('retry') conn.zcard('dead') conn.scard('processes') conn.lrange('queue:default', -1, -1) - conn.smembers('processes') - conn.smembers('queues') end end + processes = Sidekiq.redis do |conn| + sscan(conn, 'processes') + end + + queues = Sidekiq.redis do |conn| + sscan(conn, 'queues') + end + pipe2_res = Sidekiq.redis do |conn| conn.pipelined do - pipe1_res[7].each {|key| conn.hget(key, 'busy') } - pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") } + processes.each {|key| conn.hget(key, 'busy') } + queues.each {|queue| conn.llen("queue:#{queue}") } end end - s = pipe1_res[7].size + s = processes.size workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+) enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+) default_queue_latency = if (entry = pipe1_res[6].first) job = Sidekiq.load_json(entry) rescue {} @@ -114,13 +136,15 @@ def stat(s) @stats[s] end class Queues + include RedisScanner + def lengths Sidekiq.redis do |conn| - queues = conn.smembers('queues') + queues = sscan(conn, 'queues') lengths = conn.pipelined do queues.each do |queue| conn.llen("queue:#{queue}") end @@ -196,22 +220,23 @@ # job.delete if job.jid == 'abcdef1234567890' # end # class Queue include Enumerable + extend RedisScanner ## # Return all known queues within Redis. # def self.all - Sidekiq.redis { |c| c.smembers('queues') }.sort.map { |q| Sidekiq::Queue.new(q) } + Sidekiq.redis { |c| sscan(c, 'queues') }.sort.map { |q| Sidekiq::Queue.new(q) } end attr_reader :name def initialize(name="default") - @name = name + @name = name.to_s @rname = "queue:#{name}" end def size Sidekiq.redis { |con| con.llen(@rname) } @@ -698,21 +723,22 @@ # # Yields a Sidekiq::Process. # class ProcessSet include Enumerable + include RedisScanner def initialize(clean_plz=true) - self.class.cleanup if clean_plz + cleanup if clean_plz end # Cleans up dead processes recorded in Redis. # Returns the number of processes cleaned. - def self.cleanup + def cleanup count = 0 Sidekiq.redis do |conn| - procs = conn.smembers('processes').sort + procs = sscan(conn, 'processes').sort heartbeats = conn.pipelined do procs.each do |key| conn.hget(key, 'info') end end @@ -728,11 +754,11 @@ end count end def each - procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort + procs = Sidekiq.redis { |conn| sscan(conn, 'processes') }.sort Sidekiq.redis do |conn| # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way @@ -863,14 +889,15 @@ # # run_at is an epoch Integer. # end # class Workers include Enumerable + include RedisScanner def each Sidekiq.redis do |conn| - procs = conn.smembers('processes') + procs = sscan(conn, 'processes') procs.sort.each do |key| valid, workers = conn.pipelined do conn.exists(key) conn.hgetall("#{key}:workers") end @@ -888,10 +915,10 @@ # Not very efficient if you have lots of Sidekiq # processes but the alternative is a global counter # which can easily get out of sync with crashy processes. def size Sidekiq.redis do |conn| - procs = conn.smembers('processes') + procs = sscan(conn, 'processes') if procs.empty? 0 else conn.pipelined do procs.each do |key|