#!/usr/bin/env ruby # Quiet some warnings we see when running in warning mode: # RUBYOPT=-w bundle exec sidekiq $TESTING = false puts RUBY_DESCRIPTION puts(%w[THREADS LATENCY AJ PROFILE].map { |x| "#{x}: #{ENV[x] || "nil"}" }.join(", ")) require "ruby-prof" if ENV["PROFILE"] require "bundler/setup" Bundler.require(:default, :load_test) latency = Integer(ENV["LATENCY"] || 1) if latency > 0 # brew tap shopify/shopify # brew install toxiproxy # run `toxiproxy-server` in a separate terminal window. require "toxiproxy" # simulate a non-localhost network for realer-world conditions. # adding 1ms of network latency has an ENORMOUS impact on benchmarks Toxiproxy.populate([{ name: "redis", listen: "127.0.0.1:6380", upstream: "127.0.0.1:6379" }]) end if ENV["AJ"] require "active_job" puts "Using ActiveJob #{ActiveJob::VERSION::STRING}" ActiveJob::Base.queue_adapter = :sidekiq ActiveJob::Base.logger.level = Logger::WARN class LoadJob < ActiveJob::Base def perform(idx, ts=nil) puts(Time.now.to_f - ts) if !ts.nil? end end end class LoadWorker include Sidekiq::Job sidekiq_options retry: 1 sidekiq_retry_in do |x| 1 end def perform(idx, ts = nil) puts(Time.now.to_f - ts) if !ts.nil? # raise idx.to_s if idx % 100 == 1 end end def Process.rss `ps -o rss= -p #{Process.pid}`.chomp.to_i end class Loader def initialize @iter = ENV["GC"] ? 10 : 500 @count = Integer(ENV["COUNT"] || 1_000) @latency = Integer(ENV["LATENCY"] || 1) end def configure @x = Sidekiq.configure_embed do |config| config.redis = {db: 13, port: ((@latency > 0) ? 6380 : 6379)} config.concurrency = Integer(ENV.fetch("THREADS", "10")) # config.redis = { db: 13, port: 6380, driver: :hiredis} config.queues = %w[default] config.logger.level = Logger::WARN config.average_scheduled_poll_interval = 2 config.reliable! if defined?(Sidekiq::Pro) end @self_read, @self_write = IO.pipe %w[INT TERM TSTP TTIN].each do |sig| trap sig do @self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end def handle_signal(sig) launcher = @x Sidekiq.logger.debug "Got #{sig} signal" case sig when "INT" # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when "TERM" # Heroku sends TERM and then waits 30 seconds for process to exit. raise Interrupt when "TSTP" Sidekiq.logger.info "Received TSTP, no longer accepting new work" launcher.quiet when "TTIN" Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("\n") else Sidekiq.logger.warn "<no backtrace available>" end end end end def setup Sidekiq.logger.error("Setup RSS: #{Process.rss}") Sidekiq.redis { |c| c.flushdb } start = Time.now if ENV["AJ"] @iter.times do @count.times do |idx| LoadJob.perform_later(idx) end end else @iter.times do arr = Array.new(@count) { |idx| [idx] } Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr) end end Sidekiq.logger.warn "Created #{@count * @iter} jobs in #{Time.now - start} sec" end def monitor @monitor = Thread.new do GC.start loop do sleep 0.2 qsize = Sidekiq.redis do |conn| conn.llen "queue:default" end total = qsize if total == 0 ending = Time.now - @start size = @iter * @count Sidekiq.logger.error("Done, #{size} jobs in #{ending} sec, #{(size / ending).to_i} jobs/sec") Sidekiq.logger.error("Ending RSS: #{Process.rss}") Sidekiq.logger.error("Now here's the latency for three jobs") if ENV["AJ"] LoadJob.perform_later(1, Time.now.to_f) LoadJob.perform_later(2, Time.now.to_f) LoadJob.perform_later(3, Time.now.to_f) else LoadWorker.perform_async(1, Time.now.to_f) LoadWorker.perform_async(2, Time.now.to_f) LoadWorker.perform_async(3, Time.now.to_f) end sleep 0.1 @x.stop Process.kill("INT", $$) break end end end end def with_latency(latency, &block) Sidekiq.logger.error "Simulating #{latency}ms of latency between Sidekiq and redis" if latency > 0 Toxiproxy[:redis].downstream(:latency, latency: latency).apply(&block) else yield end end def run(name) Sidekiq.logger.warn("Starting #{name}") monitor if ENV["PROFILE"] RubyProf.exclude_threads = [@monitor] RubyProf.start elsif ENV["GC"] GC.start GC.compact GC.disable Sidekiq.logger.error("GC Start RSS: #{Process.rss}") end @start = Time.now with_latency(@latency) do @x.run while (readable_io = IO.select([@self_read])) signal = readable_io.first[0].gets.strip handle_signal(signal) end end # normal rescue Interrupt rescue => e raise e if $DEBUG warn e.message warn e.backtrace.join("\n") exit 1 ensure @x.stop end def done Sidekiq.logger.error("GC End RSS: #{Process.rss}") if ENV["GC"] if ENV["PROFILE"] Sidekiq.logger.error("Profiling...") result = RubyProf.stop printer = RubyProf::GraphHtmlPrinter.new(result) printer.print(File.new("output.html", "w"), min_percent: 1) end end end ll = Loader.new ll.configure unless ENV["GC"] || ENV["PROFILE"] ll.setup ll.run("warmup") end ll.setup ll.run("ideal") ll.done