#!/usr/bin/env ruby # Quiet some warnings we see when running in warning mode: # RUBYOPT=-w bundle exec sidekiq $TESTING = false #require 'ruby-prof' Bundler.require(:default) require 'sidekiq/cli' require 'sidekiq/launcher' require 'optparse' require_relative '../lib/atomic-sidekiq' $options = { batches: 10, jobs: 1_000, terminate: 0, flush: true, expiration: 2, atomic_fetch: false } OptionParser.new do |opts| opts.banner = "Usage: sidekiqload [options]" opts.on("-b n", "--batches=n", "Number of job batches to be created [default 10]") do |v| $options[:batches] = v.to_i end opts.on("-j n", "--jobs=n", "Number of jobs in each batch [default 1000]") do |v| $options[:jobs] = v.to_i end opts.on("-t n", "--terminate=n", "Probability of terminating the thread [default 0]") do |v| $options[:terminate] = v.to_f end opts.on("-f n", "--flush=n", "Flush all jobs that have been created before [default true]") do |v| $options[:flush] = (v == 'true') end opts.on("-e n", "--expiration=n", "Expiration time in seconds for a job [default 2]") do |v| $options[:expiration] = v.to_i end opts.on("-a", "--atomic-fetch", "Run the Sidekiq worker with the atomic-fetch fetcher") do |v| $options[:atomic_fetch] = v end end.parse! include Sidekiq::Util Sidekiq.configure_server do |config| #config.options[:concurrency] = 1 config.redis = { db: 13 } config.options[:queues] << 'default' config.logger.level = Logger::ERROR config.average_scheduled_poll_interval = 2 config.atomic_fetch!({ collection_interval: 10, expiration_time: $options[:expiration], }) if $options[:atomic_fetch] end class LoadWorker include Sidekiq::Worker sidekiq_options retry: 1 sidekiq_retry_in do |x| 1 end def perform(batch, idx) if rand < $options[:terminate] Sidekiq.logger.error("Terminating on job #{idx}") Process.kill("KILL", Process.pid) end begin Sidekiq.redis do |conn| conn.eval(""" local lock = redis.call('get', 'jobs:#{batch}:#{idx}') if lock then return nil end redis.call('set', 'jobs:#{batch}:#{idx}', '1') redis.call('incr', 'done') return nil """) end rescue e Sidekiq.logger.error(e) end #raise idx.to_s if idx % 100 == 1 end end # brew tap shopify/shopify # brew install toxiproxy # gem install toxiproxy #require 'toxiproxy' # simulate a non-localhost network for realer-world conditions. # adding 1ms of network latency has an ENORMOUS impact on benchmarks #Toxiproxy.populate([{ #"name": "redis", #"listen": "", #"upstream": "" #}]) self_read, self_write = IO.pipe %w(INT TERM TSTP TTIN).each do |sig| begin trap sig do puts("Killed with #{sig}") self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end if ($options[:flush]) puts "Flushing database..." Sidekiq.redis {|c| c.flushdb } end def handle_signal(launcher, sig) Sidekiq.logger.debug "Got #{sig} signal" case sig when 'INT' # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when 'TERM' # Heroku sends TERM and then waits 10 seconds for process to exit. raise Interrupt when 'TSTP' Sidekiq.logger.info "Received TSTP, no longer accepting new work" launcher.quiet when 'TTIN' Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread['label']}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("\n") else Sidekiq.logger.warn "" end end end end def Process.rss `ps -o rss= -p #{Process.pid}`.chomp.to_i end iter = $options[:batches] count = $options[:jobs] iter.times do |batch| arr = Array.new(count) do [] end count.times do |idx| arr[idx][0] = idx arr[idx][1] = batch end Sidekiq::Client.push_bulk('class' => LoadWorker, 'args' => arr) end total_enqueued = Sidekiq.redis { |conn| conn.get('total_enqueued') }.to_i + count * iter Sidekiq.redis { |conn| conn.set('total_enqueued', total_enqueued) } Sidekiq.logger.error "Created #{count*iter} jobs (total: #{total_enqueued})" Monitoring = Thread.new do def total qsize, retries = Sidekiq.redis do |conn| conn.pipelined do conn.llen "queue:default" conn.zcard "retry" end end.map(&:to_i) qsize + retries end def inflight counter = it = 0 loop do it, keys = Sidekiq.redis { |c| c.scan(it, match: 'flight:*') } counter += keys.count it = it.to_i break if it == 0 end counter end watchdog("monitor thread") do while true sleep 1 #GC.start _total = total _inflight = inflight Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{_total} Inflight: #{_inflight}") if _total == 0 && _inflight == 0 Sidekiq.logger.error("Done") exit(0) end end end end begin # RubyProf::exclude_threads = [ Monitoring ] #RubyProf.start fire_event(:startup) #Sidekiq.logger.error "Simulating 1ms of latency between Sidekiq and redis" #Toxiproxy[:redis].downstream(:latency, latency: 1).apply do launcher = Sidekiq::Launcher.new(Sidekiq.options) launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(launcher, signal) end #end rescue SystemExit => e # Sidekiq.logger.error("Profiling...") #result = RubyProf.stop #printer = RubyProf::GraphHtmlPrinter.new(result) #printer.print(File.new("output.html", "w"), :min_percent => 1) # normal rescue => e raise e if $DEBUG STDERR.puts e.message STDERR.puts e.backtrace.join("\n") exit 1 end