bin/sidekiqctl in sidekiq-5.2.2 vs bin/sidekiqctl in sidekiq-5.2.3
- old
+ new
@@ -1,22 +1,29 @@
#!/usr/bin/env ruby
require 'fileutils'
+require 'sidekiq/api'
class Sidekiqctl
DEFAULT_KILL_TIMEOUT = 10
+ CMD = File.basename($0)
attr_reader :stage, :pidfile, :kill_timeout
def self.print_usage
- puts "#{File.basename($0)} - stop a Sidekiq process from the command line."
+ puts "#{CMD} - control Sidekiq from the command line."
puts
- puts "Usage: #{File.basename($0)} <command> <pidfile> <kill_timeout>"
- puts " where <command> is either 'quiet' or 'stop'"
+ puts "Usage: #{CMD} quiet <pidfile> <kill_timeout>"
+ puts " #{CMD} stop <pidfile> <kill_timeout>"
+ puts " #{CMD} status <section>"
+ puts
puts " <pidfile> is path to a pidfile"
puts " <kill_timeout> is number of seconds to wait until Sidekiq exits"
puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd"
+ puts
+ puts " <section> (optional) view a specific section of the status output"
+ puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}"
puts
puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want"
puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop"
puts " path_to_pidfile 61`"
puts
@@ -83,9 +90,140 @@
`kill -9 #{pid}`
FileUtils.rm_f pidfile
done 'Sidekiq shut down forcefully.'
end
alias_method :shutdown, :stop
+
+ class Status
+ VALID_SECTIONS = %w[all version overview processes queues]
+ def display(section = nil)
+ section ||= 'all'
+ unless VALID_SECTIONS.include? section
+ puts "I don't know how to check the status of '#{section}'!"
+ puts "Try one of these: #{VALID_SECTIONS.join(', ')}"
+ return
+ end
+ send(section)
+ rescue StandardError => e
+ puts "Couldn't get status: #{e}"
+ end
+
+ def all
+ version
+ puts
+ overview
+ puts
+ processes
+ puts
+ queues
+ end
+
+ def version
+ puts "Sidekiq #{Sidekiq::VERSION}"
+ puts Time.now
+ end
+
+ def overview
+ puts '---- Overview ----'
+ puts " Processed: #{delimit stats.processed}"
+ puts " Failed: #{delimit stats.failed}"
+ puts " Busy: #{delimit stats.workers_size}"
+ puts " Enqueued: #{delimit stats.enqueued}"
+ puts " Retries: #{delimit stats.retry_size}"
+ puts " Scheduled: #{delimit stats.scheduled_size}"
+ puts " Dead: #{delimit stats.dead_size}"
+ end
+
+ def processes
+ puts "---- Processes (#{process_set.size}) ----"
+ process_set.each_with_index do |process, index|
+ puts "#{process['identity']} #{tags_for(process)}"
+ puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})"
+ puts " Threads: #{process['concurrency']} (#{process['busy']} busy)"
+ puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}"
+ puts '' unless (index+1) == process_set.size
+ end
+ end
+
+ COL_PAD = 2
+ def queues
+ puts "---- Queues (#{queue_data.size}) ----"
+ columns = {
+ name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
+ size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
+ latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
+ }
+ columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
+ puts
+ queue_data.each do |q|
+ columns.each do |col, (dir, width)|
+ print q.send(col).public_send(dir, width)
+ end
+ puts
+ end
+ end
+
+ private
+
+ def delimit(number)
+ number.to_s.reverse.scan(/.{1,3}/).join(',').reverse
+ end
+
+ def split_multiline(values, opts = {})
+ return 'none' unless values
+ pad = opts[:pad] || 0
+ max_length = opts[:max_length] || (80 - pad)
+ out = []
+ line = ''
+ values.each do |value|
+ if (line.length + value.length) > max_length
+ out << line
+ line = ' ' * pad
+ end
+ line << value + ', '
+ end
+ out << line[0..-3]
+ out.join("\n")
+ end
+
+ def tags_for(process)
+ tags = [
+ process['tag'],
+ process['labels'],
+ (process['quiet'] == 'true' ? 'quiet' : nil)
+ ].flatten.compact
+ tags.any? ? "[#{tags.join('] [')}]" : nil
+ end
+
+ def time_ago(timestamp)
+ seconds = Time.now - Time.at(timestamp)
+ return 'just now' if seconds < 60
+ return 'a minute ago' if seconds < 120
+ return "#{seconds.floor / 60} minutes ago" if seconds < 3600
+ return 'an hour ago' if seconds < 7200
+ "#{seconds.floor / 60 / 60} hours ago"
+ end
+
+ QUEUE_STRUCT = Struct.new(:name, :size, :latency)
+ def queue_data
+ @queue_data ||= Sidekiq::Queue.all.map do |q|
+ QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency))
+ end
+ end
+
+ def process_set
+ @process_set ||= Sidekiq::ProcessSet.new
+ end
+
+ def stats
+ @stats ||= Sidekiq::Stats.new
+ end
+ end
+end
+
+if ARGV[0] == 'status'
+ Sidekiqctl::Status.new.display(ARGV[1])
+ exit
end
if ARGV.length < 2
Sidekiqctl.print_usage
else