#!/usr/bin/env ruby

# Quiet some warnings we see when running in warning mode:
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false
puts RUBY_DESCRIPTION
puts(%w[THREADS LATENCY AJ PROFILE].map { |x| "#{x}: #{ENV[x] || "nil"}" }.join(", "))

require "ruby-prof" if ENV["PROFILE"]
require "bundler/setup"
Bundler.require(:default, :load_test)

latency = Integer(ENV["LATENCY"] || 1)
if latency > 0
  # brew tap shopify/shopify
  # brew install toxiproxy
  # run `toxiproxy-server` in a separate terminal window.
  require "toxiproxy"
  # simulate a non-localhost network for realer-world conditions.
  # adding 1ms of network latency has an ENORMOUS impact on benchmarks
  Toxiproxy.populate([{
    name: "redis",
    listen: "127.0.0.1:6380",
    upstream: "127.0.0.1:6379"
  }])
end

if ENV["AJ"]
  require "active_job"
  puts "Using ActiveJob #{ActiveJob::VERSION::STRING}"
  ActiveJob::Base.queue_adapter = :sidekiq
  ActiveJob::Base.logger.level = Logger::WARN

  class LoadJob < ActiveJob::Base
    def perform(idx, ts=nil)
      puts(Time.now.to_f - ts) if !ts.nil?
    end
  end
end

class LoadWorker
  include Sidekiq::Job
  sidekiq_options retry: 1
  sidekiq_retry_in do |x|
    1
  end

  def perform(idx, ts = nil)
    puts(Time.now.to_f - ts) if !ts.nil?
    # raise idx.to_s if idx % 100 == 1
  end
end

def Process.rss
  `ps -o rss= -p #{Process.pid}`.chomp.to_i
end

class Loader
  def initialize
    @iter = ENV["GC"] ? 10 : 500
    @count = Integer(ENV["COUNT"] || 1_000)
    @latency = Integer(ENV["LATENCY"] || 1)
  end

  def configure
    @x = Sidekiq.configure_embed do |config|
      config.redis = {db: 13, port: ((@latency > 0) ? 6380 : 6379)}
      config.concurrency = Integer(ENV.fetch("THREADS", "10"))
      # config.redis = { db: 13, port: 6380, driver: :hiredis}
      config.queues = %w[default]
      config.logger.level = Logger::WARN
      config.average_scheduled_poll_interval = 2
      config.reliable! if defined?(Sidekiq::Pro)
    end

    @self_read, @self_write = IO.pipe
    %w[INT TERM TSTP TTIN].each do |sig|
      trap sig do
        @self_write.puts(sig)
      end
    rescue ArgumentError
      puts "Signal #{sig} not supported"
    end
  end

  def handle_signal(sig)
    launcher = @x
    Sidekiq.logger.debug "Got #{sig} signal"
    case sig
    when "INT"
      # Handle Ctrl-C in JRuby like MRI
      # http://jira.codehaus.org/browse/JRUBY-4637
      raise Interrupt
    when "TERM"
      # Heroku sends TERM and then waits 30 seconds for process to exit.
      raise Interrupt
    when "TSTP"
      Sidekiq.logger.info "Received TSTP, no longer accepting new work"
      launcher.quiet
    when "TTIN"
      Thread.list.each do |thread|
        Sidekiq.logger.warn "Thread TID-#{(thread.object_id ^ ::Process.pid).to_s(36)} #{thread["label"]}"
        if thread.backtrace
          Sidekiq.logger.warn thread.backtrace.join("\n")
        else
          Sidekiq.logger.warn "<no backtrace available>"
        end
      end
    end
  end

  def setup
    Sidekiq.logger.error("Setup RSS: #{Process.rss}")
    Sidekiq.redis { |c| c.flushdb }
    start = Time.now
    if ENV["AJ"]
      @iter.times do
        @count.times do |idx|
          LoadJob.perform_later(idx)
        end
      end
    else
      @iter.times do
        arr = Array.new(@count) { |idx| [idx] }
        Sidekiq::Client.push_bulk("class" => LoadWorker, "args" => arr)
      end
    end
    Sidekiq.logger.warn "Created #{@count * @iter} jobs in #{Time.now - start} sec"
  end

  def monitor
    @monitor = Thread.new do
      GC.start
      loop do
        sleep 0.2
        qsize = Sidekiq.redis do |conn|
          conn.llen "queue:default"
        end
        total = qsize
        if total == 0
          ending = Time.now - @start
          size = @iter * @count
          Sidekiq.logger.error("Done, #{size} jobs in #{ending} sec, #{(size / ending).to_i} jobs/sec")
          Sidekiq.logger.error("Ending RSS: #{Process.rss}")
          Sidekiq.logger.error("Now here's the latency for three jobs")

          if ENV["AJ"]
            LoadJob.perform_later(1, Time.now.to_f)
            LoadJob.perform_later(2, Time.now.to_f)
            LoadJob.perform_later(3, Time.now.to_f)
          else
            LoadWorker.perform_async(1, Time.now.to_f)
            LoadWorker.perform_async(2, Time.now.to_f)
            LoadWorker.perform_async(3, Time.now.to_f)
          end

          sleep 0.1
          @x.stop
          Process.kill("INT", $$)
          break
        end
      end
    end
  end

  def with_latency(latency, &block)
    Sidekiq.logger.error "Simulating #{latency}ms of latency between Sidekiq and redis"
    if latency > 0
      Toxiproxy[:redis].downstream(:latency, latency: latency).apply(&block)
    else
      yield
    end
  end

  def run(name)
    Sidekiq.logger.warn("Starting #{name}")
    monitor

    if ENV["PROFILE"]
      RubyProf.exclude_threads = [@monitor]
      RubyProf.start
    elsif ENV["GC"]
      GC.start
      GC.compact
      GC.disable
      Sidekiq.logger.error("GC Start RSS: #{Process.rss}")
    end
    @start = Time.now
    with_latency(@latency) do
      @x.run

      while (readable_io = IO.select([@self_read]))
        signal = readable_io.first[0].gets.strip
        handle_signal(signal)
      end
    end
    # normal
  rescue Interrupt
  rescue => e
    raise e if $DEBUG
    warn e.message
    warn e.backtrace.join("\n")
    exit 1
  ensure
    @x.stop
  end

  def done
    Sidekiq.logger.error("GC End RSS: #{Process.rss}") if ENV["GC"]
    if ENV["PROFILE"]
      Sidekiq.logger.error("Profiling...")
      result = RubyProf.stop
      printer = RubyProf::GraphHtmlPrinter.new(result)
      printer.print(File.new("output.html", "w"), min_percent: 1)
    end
  end
end

ll = Loader.new
ll.configure

unless ENV["GC"] || ENV["PROFILE"]
  ll.setup
  ll.run("warmup")
end

ll.setup
ll.run("ideal")
ll.done