#!/usr/bin/env ruby # Runner for running given benchmark cases # Some of the cases require pre-populated data and we populate this in places that need it # In other cases we generate this data in a background process, so the partitions data stream # is consistent and we don't end up consuming huge batches of a single partition. require 'open3' require 'pathname' $LOAD_PATH.unshift(File.dirname(__FILE__)) $LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..')) ROOT_PATH = Pathname.new(File.expand_path(File.join(File.dirname(__FILE__), '../'))) BENCHMARK_TOPICS = { 'benchmarks_00_01' => 1, 'benchmarks_00_05' => 5, 'benchmarks_01_05' => 5, 'benchmarks_00_10' => 10 } # Load all the benchmarks benchmarks = Dir[ROOT_PATH.join('spec/benchmarks/**/*.rb')] # If filter is provided, apply benchmarks.delete_if { |name| !name.include?(ARGV[0]) } if ARGV[0] raise ArgumentError, "No benchmarks with filter: #{ARGV[0]}" if benchmarks.empty? # We may skip seeding if we are running the benchmarks multiple times, then since we do not # commit offsets we can skip generating more data if ENV['SEED'] require 'spec/benchmarks_helper' # We need to setup karafka here to have producer for data seeding setup_karafka # This takes some time but needs to run only once per benchmark session puts 'Seeding benchmarks data...' producer = Karafka::App.producer # We make our data json compatible so we can also benchmark serialization elements = Array.new(100_000) { { a: :b }.to_json } topics = Karafka::Admin.cluster_info.topics.map { |details| details.fetch(:topic_name) } BENCHMARK_TOPICS.each do |topic_name, partitions_count| ::Karafka::Admin.delete_topic(topic_name) if topics.include?(topic_name) ::Karafka::Admin.create_topic(topic_name, partitions_count, 1) end # We do not populate data of benchmarks_0_10 as we use it with life-stream data only %w[ benchmarks_00_01 benchmarks_00_05 ].each do |topic_name| partitions_count = topic_name.split('_').last.to_i partitions_count.times do |partition| puts "Seeding #{topic_name}:#{partition}" elements.each_slice(10_000) do |data_slice| data = data_slice.map do |data| { topic: topic_name, payload: data, partition: partition } end producer.buffer_many(data) producer.flush_sync end end end end # Selects requested benchmarks and runs them one after another benchmarks.each do |benchmark_path| puts "Running #{benchmark_path.gsub("#{ROOT_PATH}/spec/benchmarks/", '')}" benchmark = "bundle exec ruby -r ./spec/benchmarks_helper.rb #{benchmark_path}" Open3.popen3(benchmark) do |stdin, stdout, stderr, thread| t1 = Thread.new do while line = stdout.gets puts(line) end rescue IOError end t2 = Thread.new do while line = stderr.gets puts(line) end rescue IOError end thread.join end end