# frozen_string_literal: true

require "gouda/test_helper"
require "csv"

class GoudaTest < ActiveSupport::TestCase
  include AssertHelper

  setup do
    Thread.current[:gouda_test_side_effects] = []
    @adapter ||= Gouda::Adapter.new
    Gouda::Railtie.initializers.each(&:run)
  end

  class GoudaTestJob < ActiveJob::Base
    self.queue_adapter = Gouda::Adapter.new
  end

  class IncrementJob < GoudaTestJob
    def perform
      Thread.current[:gouda_test_side_effects] << "did-run"
    end
  end

  class StandardJob < GoudaTestJob
    def perform
      "perform result of #{self.class}"
    end
  end

  class JobThatAlwaysFails < GoudaTestJob
    def perform
      raise "Some drama"
    end
  end

  class JobWithEnqueueConcurrency < GoudaTestJob
    def perform
      "perform result of #{self.class}"
    end

    def enqueue_concurrency_key
      "conc"
    end
  end

  class JobWithEnqueueConcurrencyAndCursor < GoudaTestJob
    attr_accessor :cursor_position

    def perform
      "perform result of #{self.class}"
    end

    def enqueue_concurrency_key
      "conc1"
    end
  end

  class JobWithEnqueueConcurrencyViaGoudaAndEnqueueLimit < GoudaTestJob
    include Gouda::ActiveJobExtensions::Concurrency
    gouda_control_concurrency_with(enqueue_limit: 1, key: -> { self.class.to_s })
    def perform
      "perform result of #{self.class}"
    end
  end

  class JobWithEnqueueConcurrencyViaGoudaAndTotalLimit < GoudaTestJob
    include Gouda::ActiveJobExtensions::Concurrency
    gouda_control_concurrency_with(total_limit: 1, key: -> { self.class.to_s })
    def perform
      "perform result of #{self.class}"
    end
  end

  class JobWithExecutionConcurrency < GoudaTestJob
    def perform
      "perform result of #{self.class}"
    end

    def execution_concurrency_key
      "there-can-be-just-one"
    end
  end

  class JobWithExecutionConcurrencyViaGoudaAndTotalLimit < GoudaTestJob
    include Gouda::ActiveJobExtensions::Concurrency
    gouda_control_concurrency_with(total_limit: 1, key: -> { self.class.to_s })
    def perform
      "perform result of #{self.class}"
    end
  end

  class JobWithExecutionConcurrencyViaGoudaAndPerformLimit < GoudaTestJob
    include Gouda::ActiveJobExtensions::Concurrency
    gouda_control_concurrency_with(perform_limit: 1, key: -> { self.class.to_s })
    def perform
      "perform result of #{self.class}"
    end
  end

  class HeavyJob < GoudaTestJob
    queue_as :heavy

    def perform
      "Heavy work"
    end
  end

  class JobThatReenqueuesItself < GoudaTestJob
    def perform
      enqueue
      enqueue
      enqueue
      enqueue
    end
  end

  class JobThatRetriesItself < GoudaTestJob
    class Unfortunateness < StandardError
    end

    retry_on Unfortunateness, attempts: 5, wait: 0

    def perform
      raise Unfortunateness, "Tranquilo!"
    end
  end

  class JobWithEnqueueCallback < GoudaTestJob
    after_enqueue do |job|
      Thread.current[:gouda_test_side_effects] << "after-enq"
    end

    def perform
      "whatever"
    end
  end

  test "is able to enqueue jobs" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 12) do
      some_jobs = 10.times.map { StandardJob.new }
      @adapter.enqueue_all(some_jobs)
      @adapter.enqueue(StandardJob.new)
      @adapter.enqueue_at(StandardJob.new, _at_epoch = 1)
    end
  end

  test "reports jobs waiting to start" do
    assert_equal 0, Gouda::Workload.waiting_to_start.count
    StandardJob.perform_later
    assert_equal 1, Gouda::Workload.waiting_to_start.count

    StandardJob.set(wait: -200).perform_later
    assert_equal 2, Gouda::Workload.waiting_to_start.count

    StandardJob.set(wait: 200).perform_later
    assert_equal 2, Gouda::Workload.waiting_to_start.count

    StandardJob.set(queue: "another").perform_later
    assert_equal 2, Gouda::Workload.waiting_to_start(queue_constraint: Gouda::ExceptQueueConstraint.new(["another"])).count
  end

  test "skips jobs with the same enqueue concurrency key, both within the same bulk and separately" do
    some_jobs = 12.times.map { JobWithEnqueueConcurrency.new }
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      @adapter.enqueue_all(some_jobs)
    end
    enqueued, not_enqueued = some_jobs.partition(&:successfully_enqueued?)
    assert_equal 1, enqueued.length
    assert_nil enqueued.first.enqueue_error
    assert_kind_of ActiveJob::EnqueueError, not_enqueued.first.enqueue_error

    # Attempt to enqueue one more - now it won't be filtered out at insert_all but
    # instead by the DB
    one_more = JobWithEnqueueConcurrency.new
    @adapter.enqueue_all([one_more])
    refute_predicate one_more, :successfully_enqueued?
    assert_kind_of ActiveJob::EnqueueError, one_more.enqueue_error

    # Mark the first job as done and make sure the next one enqueues correctly
    # (the presence of a job which has executed should not make the unique constraint fire)
    Gouda::Workload.update_all(state: "finished")
    one_more_still = JobWithEnqueueConcurrency.new
    @adapter.enqueue_all([one_more_still])
    assert_predicate one_more_still, :successfully_enqueued?
  end

  test "allows 2 jobs with the same enqueue concurrency key if one of them comes from the scheduler" do
    scheduled_job = JobWithEnqueueConcurrency.new
    scheduled_job.scheduler_key = "sometime"

    immediate_job = JobWithEnqueueConcurrency.new
    another_immediate_job = JobWithEnqueueConcurrency.new

    another_scheduled_job = JobWithEnqueueConcurrency.new
    another_scheduled_job.scheduler_key = "sometime"

    yet_another_scheduled_job = JobWithEnqueueConcurrency.new
    yet_another_scheduled_job.scheduler_key = "some other time" # Different cron task, but still should not enqueue

    some_jobs = [immediate_job, another_immediate_job, scheduled_job, another_scheduled_job, yet_another_scheduled_job]
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 2) do
      @adapter.enqueue_all(some_jobs)
    end

    _enqueued, not_enqueued = some_jobs.partition(&:successfully_enqueued?)
    assert_equal 3, not_enqueued.length
    assert_same_set [another_immediate_job, another_scheduled_job, yet_another_scheduled_job], not_enqueued
  end

  test "allows multiple jobs with the same enqueue concurrency but different job-iteration cursor position" do
    first_job = JobWithEnqueueConcurrencyAndCursor.new
    second_job = JobWithEnqueueConcurrencyAndCursor.new
    second_job.cursor_position = "123"

    third_job_dupe = JobWithEnqueueConcurrencyAndCursor.new
    third_job_dupe.cursor_position = "123"

    fourth_job = JobWithEnqueueConcurrencyAndCursor.new
    fourth_job.cursor_position = "456"

    some_jobs = [first_job, second_job, third_job_dupe, fourth_job]
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 3) do
      @adapter.enqueue_all(some_jobs)
    end

    _enqueued, not_enqueued = some_jobs.partition(&:successfully_enqueued?)
    assert_same_set [third_job_dupe], not_enqueued
  end

  test "skips jobs with the same enqueue concurrency key enqueued over multiple bulks" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 2) do
      some_jobs = 12.times.map { JobWithEnqueueConcurrency.new } + [StandardJob.new]
      @adapter.enqueue_all(some_jobs)
    end

    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      some_jobs = 12.times.map { JobWithEnqueueConcurrency.new } + [StandardJob.new]
      @adapter.enqueue_all(some_jobs)
    end
  end

  test "skips jobs with the same enqueue concurrency key picked from Gouda settings" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 2) do
      some_jobs = 12.times.map { JobWithEnqueueConcurrencyViaGoudaAndTotalLimit.new } + [StandardJob.new]
      @adapter.enqueue_all(some_jobs)
    end

    assert_changes_by(-> { Gouda::Workload.count }, exactly: 2) do
      some_jobs = 12.times.map { JobWithEnqueueConcurrencyViaGoudaAndEnqueueLimit.new } + [StandardJob.new]
      @adapter.enqueue_all(some_jobs)
    end
  end

  test "executes a job" do
    job = StandardJob.new
    @adapter.enqueue_all([job])

    assert_equal 1, Gouda::Workload.where(state: "enqueued").count

    execution_result = Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test")
    assert_equal "perform result of GoudaTest::StandardJob", execution_result

    assert_equal 1, Gouda::Workload.where(state: "finished").count

    execution_result = Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test") # No more jobs to perform
    assert_nil execution_result
  end

  test "marks a job as failed if it raises an exception" do
    job = JobThatAlwaysFails.new
    @adapter.enqueue_all([job])

    assert_equal 1, Gouda::Workload.where(state: "enqueued").count
    result = Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test")
    assert_kind_of StandardError, result

    assert_equal 1, Gouda::Workload.where(state: "finished").count
    execution_result = Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test") # No more jobs to perform
    assert_nil execution_result
  end

  test "executes enqueue callbacks" do
    Gouda.in_bulk do
      3.times { JobWithEnqueueCallback.perform_later }
    end
    assert_equal ["after-enq", "after-enq", "after-enq"], Thread.current[:gouda_test_side_effects]
  end

  test "does not execute multiple jobs with the same execution concurrency key" do
    @adapter.enqueue_all([JobWithExecutionConcurrency.new, JobWithExecutionConcurrency.new])

    assert_equal 2, Gouda::Workload.where(state: "enqueued").count

    first_worker = Fiber.new do
      loop do
        gouda_or_nil = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
        Fiber.yield(gouda_or_nil)
      end
    end

    second_worker = Fiber.new do
      loop do
        gouda_or_nil = Gouda::Workload.checkout_and_lock_one(executing_on: "worker2")
        Fiber.yield(gouda_or_nil)
      end
    end

    first_job = first_worker.resume
    second_job = second_worker.resume
    assert_kind_of Gouda::Workload, first_job
    assert_nil second_job

    first_job.perform_and_update_state!
    second_job = second_worker.resume
    assert_kind_of Gouda::Workload, second_job

    assert_nil first_worker.resume
    assert_nil second_worker.resume
  end

  test "extracts execution concurrency key using Gouda configuration" do
    @adapter.enqueue_all([JobWithExecutionConcurrencyViaGoudaAndPerformLimit.new, JobWithExecutionConcurrencyViaGoudaAndTotalLimit.new])

    expected_keys = [
      "GoudaTest::JobWithExecutionConcurrencyViaGoudaAndPerformLimit",
      "GoudaTest::JobWithExecutionConcurrencyViaGoudaAndTotalLimit"
    ]
    assert_same_set expected_keys, Gouda::Workload.pluck(:execution_concurrency_key)
  end

  test "enqueues a job with a set scheduled_at" do
    job = StandardJob.new
    job.scheduled_at = Time.now.utc + 2
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      @adapter.enqueue_all([job])
    end

    # That job is delayed and will not SELECT now
    assert_nil Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
    sleep 10
    assert_kind_of Gouda::Workload, Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
  end

  test "enqueues a job with a `wait` via enqueue_at" do
    job = StandardJob.new
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      @adapter.enqueue_at(job, _epoch = (Time.now.to_i + 2))
    end

    # That job is delayed and will not SELECT now
    assert_nil Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
    sleep 10
    assert_kind_of Gouda::Workload, Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
  end

  test "is able to perform_later without a wait:" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      StandardJob.perform_later
    end
    maybe_job = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
    assert_kind_of Gouda::Workload, maybe_job
  end

  test "is able to perform_later with wait:" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      StandardJob.set(wait: 4).perform_later
    end
    # The job will not be selected as it has to wait
    assert_nil Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
  end

  test "is able to bulk-enqueue" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 11) do
      Gouda.in_bulk do
        4.times { StandardJob.perform_later }
        Gouda.in_bulk do
          4.times { StandardJob.perform_later }
          Gouda.in_bulk do
            3.times { StandardJob.perform_later }
          end
        end
      end
    end
  end

  test "sets the correct queue with perform_later" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 1) do
      HeavyJob.perform_later
    end
    gouda = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
    assert_equal "heavy", gouda.queue_name
  end

  test "selects jobs in priority order" do
    priorities_ascending = [0, 4, 10, 25]
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 30) do
      jobs_with_priorities = 30.times.map do
        StandardJob.new.tap do |j|
          j.priority = priorities_ascending.sample(random: case_random)
        end
      end
      @adapter.enqueue_all(jobs_with_priorities)
    end

    checked_out_priorities = []
    30.times do
      gouda = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1")
      checked_out_priorities << gouda.priority
    end
    assert_equal priorities_ascending, checked_out_priorities.uniq
  end

  test "is able to perform a job which re-enqueues itself" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 5) do
      JobThatReenqueuesItself.perform_later
      Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test")
    end

    assert_equal 4, Gouda::Workload.where(state: "enqueued").count
    assert_equal 1, Gouda::Workload.where(state: "finished").count
    assert_equal 1, Gouda::Workload.select("DISTINCT active_job_id").count
  end

  test "is able to perform a job which retries itself" do
    assert_changes_by(-> { Gouda::Workload.count }, exactly: 5) do
      JobThatRetriesItself.perform_later
      5.times do
        Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test")
      end
    end

    assert_equal 5, Gouda::Workload.where(state: "finished").count
  end

  test "saves the time of interruption upon enqueue when the interruption timestamp ivar is set" do
    freeze_time
    t = Time.now.utc
    job = StandardJob.new
    job.interrupted_at = t
    job.enqueue

    last_inserted_workload = Gouda::Workload.last
    assert_equal last_inserted_workload.interrupted_at, t
  end

  test "raises in interrupt error when the the interruption timestamp ivar is set" do
    freeze_time
    t = Time.now.utc
    job = StandardJob.new
    job.interrupted_at = t

    assert_raises Gouda::InterruptError do
      job.perform_now
    end
  end

  test "selects jobs according to the queue constraint" do
    queues = %w[foo bar baz]
    20.times do
      StandardJob.set(queue: queues.sample(random: case_random)).perform_later
    end

    only_bad = Gouda::OnlyQueuesConstraint.new(["bad"])
    assert_nil Gouda::Workload.checkout_and_lock_one(executing_on: "worker1", queue_constraint: only_bad)

    only_foo = Gouda::OnlyQueuesConstraint.new(["foo"])
    selected_job = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1", queue_constraint: only_foo)
    assert_equal "foo", selected_job.queue_name

    only_foo_and_baz = Gouda::OnlyQueuesConstraint.new(["foo", "baz"])
    selected_job = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1", queue_constraint: only_foo_and_baz)
    assert ["foo", "baz"].include?(selected_job.queue_name)

    except_foo_and_baz = Gouda::ExceptQueueConstraint.new(["foo", "baz"])
    selected_job = Gouda::Workload.checkout_and_lock_one(executing_on: "worker1", queue_constraint: except_foo_and_baz)
    assert_equal "bar", selected_job.queue_name
  end

  test "has reasonable performance with a pre-seeded queue" do
    # slow_test!

    Gouda.in_bulk do
      with_sample_job_delays(n_jobs: 100_000) do |delay|
        StandardJob.set(wait: delay).perform_later
      end
    end
  end

  test "prunes the finished workloads from the table" do
    Gouda.in_bulk do
      250.times do
        StandardJob.perform_later
      end
    end
    Gouda.config.preserve_job_records = false
    Gouda::Workload.update_all(execution_finished_at: 3.days.ago, state: "finished")

    Gouda.in_bulk do
      2.times do
        StandardJob.perform_later
      end
    end

    Gouda::Workload.prune
    assert_equal 2, Gouda::Workload.count
  end

  test "sets the error column on a job that fails" do
    JobThatRetriesItself.perform_later
    Gouda::Workload.checkout_and_perform_one(executing_on: "Unit test")

    errored = Gouda::Workload.where.not(error: {}).first
    assert_equal errored.error["message"], "Tranquilo!"
    assert_equal errored.error["class_name"], "GoudaTest::JobThatRetriesItself::Unfortunateness"
    refute_empty errored.error["backtrace"]
  end

  test "has reasonable performance" do
    skip "This test is very intense (and slow!)"

    Dir.mktmpdir do |tmpdir_path|
      n_bulks = 100
      n_workers = 15
      bulk_size = 1000

      # Fill up the queue with data
      rng = Random.new
      n_bulks.times do |n|
        active_jobs = bulk_size.times.map do
          StandardJob.new.tap do |job|
            job.scheduled_at = Time.now.utc + rng.rand(3.0)
          end
        end
        @adapter.enqueue_all(active_jobs)
      end

      # Running the processing in threads is not very conductive to performance
      # measurement as there is the GIL to contend with, and this test gets real busy.
      # let's use processes instead.
      consumer_pids = n_workers.times.map do |n|
        fork do
          File.open(File.join(tmpdir_path, "bj_test_stat_#{n}.bin"), "w") do |f|
            ActiveRecord::Base.connection.execute("SET statement_timeout TO '30s'")
            loop do
              t = Process.clock_gettime(Process::CLOCK_MONOTONIC)

              did_perform = Gouda::Workload.checkout_and_perform_one(executing_on: "perf")
              break unless did_perform

              dt = Process.clock_gettime(Process::CLOCK_MONOTONIC) - t
              f.puts(dt.to_s)
            end
          end
        end
      end

      # Let the cosumers run for 5 seconds
      sleep 5

      # Then terminate them
      consumer_pids.each do |pid|
        Process.kill("TERM", pid)
        Process.wait(pid)
      end
      # and collect stats
      times_to_consume = Dir.glob(File.join(tmpdir_path, "bj_test_stat_*.*")).flat_map do |path|
        values = File.read(path).split("\n").map { |sample| sample.to_f }
        values.tap { File.unlink(path) }
      end

      warn "Take+perform with #{n_workers} workers and #{n_bulks * bulk_size} jobs in queue: #{sample_description(times_to_consume)}"
      assert_equal n_bulks * bulk_size, Gouda::Workload.count
    end
  end

  test "actually executes workloads" do
    4.times { StandardJob.perform_later }
    2.times { JobThatAlwaysFails.perform_later }
    4.times { JobWithExecutionConcurrency.perform_later }
    6.times { JobWithEnqueueConcurrency.perform_later }

    Gouda.worker_loop(n_threads: 3, check_shutdown: Gouda::EmptyQueueShutdownCheck.new)
    assert_equal 11, Gouda::Workload.where(state: "finished").count
  end

  test "parses queue constraints" do
    constraint = Gouda.parse_queue_constraint("*")
    assert_equal Gouda::AnyQueue, constraint

    constraint = Gouda.parse_queue_constraint("-heavy")
    assert_kind_of Gouda::ExceptQueueConstraint, constraint
    assert_equal "queue_name NOT IN ('heavy')", constraint.to_sql.strip

    constraint = Gouda.parse_queue_constraint("foo,bar")
    assert_kind_of Gouda::OnlyQueuesConstraint, constraint
    assert_equal "queue_name IN ('foo','bar')", constraint.to_sql.strip
  end

  test "checkout and performs from the right queue only" do
    heavy_constraint = Gouda.parse_queue_constraint("heavy")
    except_heavy_constraint = Gouda.parse_queue_constraint("-heavy")
    heavy = HeavyJob.perform_later
    light = StandardJob.perform_later

    assert_same_set Gouda::Workload.enqueued.pluck(:active_job_id), [heavy.job_id, light.job_id]
    assert_changes -> { Gouda::Workload.finished.count }, to: 1 do
      Gouda::Workload.checkout_and_perform_one(executing_on: "test", queue_constraint: heavy_constraint)
    end
    assert_no_changes -> { Gouda::Workload.finished.count } do
      Gouda::Workload.checkout_and_perform_one(executing_on: "test", queue_constraint: heavy_constraint)
    end

    assert_equal 1, Gouda::Workload.finished.count
    assert_equal heavy.job_id, Gouda::Workload.finished.first.active_job_id

    heavy2 = HeavyJob.perform_later
    assert_same_set Gouda::Workload.enqueued.pluck(:active_job_id), [heavy2.job_id, light.job_id]

    assert_changes -> { Gouda::Workload.finished.count }, to: 2 do
      Gouda::Workload.checkout_and_perform_one(executing_on: "test", queue_constraint: except_heavy_constraint)
    end
    assert_no_changes -> { Gouda::Workload.finished.count } do
      Gouda::Workload.checkout_and_perform_one(executing_on: "test", queue_constraint: except_heavy_constraint)
    end

    assert_equal 2, Gouda::Workload.finished.count
    assert_same_set [heavy.job_id, light.job_id], Gouda::Workload.finished.pluck(:active_job_id)
  end

  test "picks up fused jobs, but marks them as finished right away without execuing the job code" do
    IncrementJob.perform_later

    assert_changes -> { Gouda::Workload.finished.count }, to: 1 do
      Gouda::Workload.checkout_and_perform_one(executing_on: "test")
    end

    Gouda::JobFuse.create!(active_job_class_name: "GoudaTest::IncrementJob")
    IncrementJob.perform_later

    assert_changes -> { Gouda::Workload.finished.count }, to: 2 do
      Gouda::Workload.checkout_and_perform_one(executing_on: "test")
    end

    assert_equal ["did-run"], Thread.current[:gouda_test_side_effects]
  end

  test "instrumentation" do
    payload = subscribed_notification_for("workloads_revived_counter.gouda") do
      Gouda.instrument(:workloads_revived_counter, {size: 1, job_class: "test_class"})
    end

    assert_equal "test_class", payload[:job_class]
    assert_equal 1, payload[:size]
  end

  def sample_description(sample)
    values = sample.map(&:to_f).sort

    max = values.last #=> 9.0
    n = values.size # => 9
    values.map!(&:to_f) # => [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
    mean = values.reduce(&:+) / n # => 5.0
    sum_sqr = values.map { |x| x * x }.reduce(&:+) # => 285.0
    std_dev = Math.sqrt((sum_sqr - n * mean * mean) / (n - 1)) # => 2.7386127875258306

    percentile = ->(fraction) {
      if values.length > 1
        k = (fraction * (values.length - 1) + 1).floor - 1
        f = (fraction * (values.length - 1) + 1).modulo(1)
        values[k] + (f * (values[k + 1] - values[k]))
      else
        values.first
      end
    }
    p90 = percentile.call(0.9)
    p95 = percentile.call(0.95)
    p99 = percentile.call(0.99)

    "sample_size: %d, mean: %0.3f, max: %0.3f, p90: %0.3f, p95: %0.3f, p99: %0.3f, stddev: %0.3f" % [sample.length, mean, max, p90, p95, p99, std_dev]
  end

  def with_sample_job_delays(n_jobs:)
    rows = CSV.read(File.dirname(__FILE__) + "/seconds_to_start_distribution.csv")
    rows.shift
    count_to_delay_seconds = rows.map { |(a, b)| [a.to_i, b.to_i] }
    total_records = count_to_delay_seconds.map(&:first).sum
    scaling_factor = n_jobs / total_records.to_f
    count_to_delay_seconds.each do |(initial_count, delay_seconds)|
      (scaling_factor * initial_count).round.times do
        yield(delay_seconds)
      end
    end
  end
end