lib/sidekiq/api.rb in sidekiq-5.1.3 vs lib/sidekiq/api.rb in sidekiq-5.2.0
- old
+ new
@@ -1,10 +1,26 @@
# frozen_string_literal: true
require 'sidekiq'
module Sidekiq
+
+ module RedisScanner
+ def sscan(conn, key)
+ cursor = '0'
+ result = []
+ loop do
+ cursor, values = conn.sscan(key, cursor)
+ result.push(*values)
+ break if cursor == '0'
+ end
+ result
+ end
+ end
+
class Stats
+ include RedisScanner
+
def initialize
fetch_stats!
end
def processed
@@ -55,23 +71,29 @@
conn.zcard('schedule')
conn.zcard('retry')
conn.zcard('dead')
conn.scard('processes')
conn.lrange('queue:default', -1, -1)
- conn.smembers('processes')
- conn.smembers('queues')
end
end
+ processes = Sidekiq.redis do |conn|
+ sscan(conn, 'processes')
+ end
+
+ queues = Sidekiq.redis do |conn|
+ sscan(conn, 'queues')
+ end
+
pipe2_res = Sidekiq.redis do |conn|
conn.pipelined do
- pipe1_res[7].each {|key| conn.hget(key, 'busy') }
- pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
+ processes.each {|key| conn.hget(key, 'busy') }
+ queues.each {|queue| conn.llen("queue:#{queue}") }
end
end
- s = pipe1_res[7].size
+ s = processes.size
workers_size = pipe2_res[0...s].map(&:to_i).inject(0, &:+)
enqueued = pipe2_res[s..-1].map(&:to_i).inject(0, &:+)
default_queue_latency = if (entry = pipe1_res[6].first)
job = Sidekiq.load_json(entry) rescue {}
@@ -114,13 +136,15 @@
def stat(s)
@stats[s]
end
class Queues
+ include RedisScanner
+
def lengths
Sidekiq.redis do |conn|
- queues = conn.smembers('queues')
+ queues = sscan(conn, 'queues')
lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
@@ -196,22 +220,23 @@
# job.delete if job.jid == 'abcdef1234567890'
# end
#
class Queue
include Enumerable
+ extend RedisScanner
##
# Return all known queues within Redis.
#
def self.all
- Sidekiq.redis { |c| c.smembers('queues') }.sort.map { |q| Sidekiq::Queue.new(q) }
+ Sidekiq.redis { |c| sscan(c, 'queues') }.sort.map { |q| Sidekiq::Queue.new(q) }
end
attr_reader :name
def initialize(name="default")
- @name = name
+ @name = name.to_s
@rname = "queue:#{name}"
end
def size
Sidekiq.redis { |con| con.llen(@rname) }
@@ -698,21 +723,22 @@
#
# Yields a Sidekiq::Process.
#
class ProcessSet
include Enumerable
+ include RedisScanner
def initialize(clean_plz=true)
- self.class.cleanup if clean_plz
+ cleanup if clean_plz
end
# Cleans up dead processes recorded in Redis.
# Returns the number of processes cleaned.
- def self.cleanup
+ def cleanup
count = 0
Sidekiq.redis do |conn|
- procs = conn.smembers('processes').sort
+ procs = sscan(conn, 'processes').sort
heartbeats = conn.pipelined do
procs.each do |key|
conn.hget(key, 'info')
end
end
@@ -728,11 +754,11 @@
end
count
end
def each
- procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort
+ procs = Sidekiq.redis { |conn| sscan(conn, 'processes') }.sort
Sidekiq.redis do |conn|
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
@@ -863,14 +889,15 @@
# # run_at is an epoch Integer.
# end
#
class Workers
include Enumerable
+ include RedisScanner
def each
Sidekiq.redis do |conn|
- procs = conn.smembers('processes')
+ procs = sscan(conn, 'processes')
procs.sort.each do |key|
valid, workers = conn.pipelined do
conn.exists(key)
conn.hgetall("#{key}:workers")
end
@@ -888,10 +915,10 @@
# Not very efficient if you have lots of Sidekiq
# processes but the alternative is a global counter
# which can easily get out of sync with crashy processes.
def size
Sidekiq.redis do |conn|
- procs = conn.smembers('processes')
+ procs = sscan(conn, 'processes')
if procs.empty?
0
else
conn.pipelined do
procs.each do |key|