#!/usr/bin/env ruby # # bin/bench is a helpful script to load test and # performance tune Sidekiq's core. It's a configurable script, # which accepts the following parameters as ENV variables. # # QUEUES # Number of queues to consume from. Default is 8 # # PROCESSES # The number of processes this benchmark will create. Each process, consumes # from one of the available queues. When processes are more than the number of # queues, they are distributed to processes in round robin. Default is 8 # # ELEMENTS # Number of jobs to push to each queue. Default is 1000 # # ITERATIONS # Each queue pushes ITERATIONS times ELEMENTS jobs. Default is 1000 # # PORT # The port of the Dragonfly instance. Default is 6379 # # IP # The ip of the Dragonfly instance. Default is 127.0.0.1 # # Example Usage: # # > RUBY_YJIT_ENABLE=1 THREADS=10 PROCESSES=8 QUEUES=8 bin/multi_queue_bench # # None of this script is considered a public API and may change over time. # # Quiet some warnings we see when running in warning mode: # RUBYOPT=-w bundle exec sidekiq $TESTING = false puts RUBY_DESCRIPTION require "bundler/setup" Bundler.require(:default, :load_test) class LoadWorker include Sidekiq::Job sidekiq_options retry: 1 sidekiq_retry_in do |x| 1 end def perform(idx, ts = nil) puts(Time.now.to_f - ts) if !ts.nil? # raise idx.to_s if idx % 100 == 1 end end def Process.rss `ps -o rss= -p #{Process.pid}`.chomp.to_i end $iterations = ENV["ITERATIONS"] ? Integer(ENV["ITERATIONS"]) : 1_000 $elements = ENV["ELEMENTS"] ? Integer(ENV["ELEMENTS"]) : 1_000 $port = ENV["PORT"] ? Integer(ENV["PORT"]) : 6379 $ip = ENV["IP"] ? String(ENV["IP"]) : "127.0.0.1" class Loader def initialize @iter = $iterations @count = $elements end def configure(queue) @x = Sidekiq.configure_embed do |config| config.redis = {db: 0, host: $ip, port: $port} config.concurrency = Integer(ENV.fetch("THREADS", "30")) config.queues = queue config.logger.level = Logger::WARN config.average_scheduled_poll_interval = 2 config.reliable! if defined?(Sidekiq::Pro) end @self_read, @self_write = IO.pipe %w[INT TERM TSTP TTIN].each do |sig| trap sig do @self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end def handle_signal(sig) launcher = @x Sidekiq.logger.debug "Got #{sig} signal" case sig when "INT" # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when "TERM" # Heroku sends TERM and then waits 30 seconds for process to exit. raise Interrupt when "TSTP" Sidekiq.logger.info "Received TSTP, no longer accepting new work" launcher.quiet when "TTIN" Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("\n") else Sidekiq.logger.warn "" end end end end def setup(queue) Sidekiq.logger.error("Setup RSS: #{Process.rss}") Sidekiq.logger.error("Pushing work to queue: #{queue}") start = Time.now @iter.times do arr = Array.new(@count) { |idx| [idx] } #always prepends by queue:: that's why we pass 'q1, q2 etc' instead of `queue::q1` Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr, "queue" => queue) end end def monitor_single(queue) q = "queue:#{queue}" @monitor_single = Thread.new do GC.start loop do sleep 0.2 total = Sidekiq.redis do |conn| conn.llen q end if total == 0 sleep 0.1 @x.stop Process.kill("INT", $$) break end end end end def monitor_all(queues) @monitor_all = Thread.new do GC.start loop do sleep 0.2 qsize = 0 queues.each do |q| tmp = Sidekiq.redis do |conn| conn.llen q end qsize = qsize + tmp end total = qsize if total == 0 ending = Time.now - @start size = @iter * @count * queues.length() Sidekiq.logger.error("Done, #{size} jobs in #{ending} sec, #{(size / ending).to_i} jobs/sec") Sidekiq.logger.error("Ending RSS: #{Process.rss}") sleep 0.1 @x.stop Process.kill("INT", $$) break end end end end def run(queues, queue, monitor_all_queues) #Sidekiq.logger.warn("Consuming from #{queue}") if monitor_all_queues monitor_all(queues) else monitor_single(queue) end @start = Time.now @x.run while (readable_io = IO.select([@self_read])) signal = readable_io.first[0].gets.strip handle_signal(signal) end # normal rescue Interrupt rescue => e raise e if $DEBUG warn e.message warn e.backtrace.join("\n") exit 1 ensure @x.stop end end def setup(queue) ll = Loader.new ll.configure(queue) ll.setup(queue) end def consume(queues, queue, monitor_all_queues) ll = Loader.new ll.configure(queue) ll.run(queues, queue, monitor_all_queues) end # We assign one queue to each sidekiq process def run(number_of_processes, total_queues) read_stream, write_stream = IO.pipe queues = [] (0..total_queues-1).each do |idx| queues.push("queue:q#{idx}") end Sidekiq.logger.info("Queues are: #{queues}") # Produce start = Time.now (0..total_queues-1).each do |idx| Process.fork do queue_num = "q#{idx}" setup(queue_num) end end queue_sz = $iterations * $elements * total_queues Process.waitall ending = Time.now - start #Sidekiq.logger.info("Pushed #{queue_sz} in #{ending} secs") # Consume (0..number_of_processes-1).each do |idx| Process.fork do # First process only consumes from it's own queue but monitors all queues. # It works as a synchronization point. Once all processes finish # (that is, when all queues are emptied) it prints the the stats. if idx == 0 queue = "q#{idx}" consume(queues, queue, true) else queue = "q#{idx % total_queues}" consume(queues, queue, false) end end end Process.waitall write_stream.close results = read_stream.read read_stream.close end $total_processes = ENV["PROCESSES"] ? Integer(ENV["PROCESSES"]) : 8; $total_queues = ENV["QUEUES"] ? Integer(ENV["QUEUES"]) : 8; run($total_processes, $total_queues)