#!/usr/bin/env ruby # Runner to run integration specs in parallel # Part of integration specs run pristine without bundler. # If we would run bundle exec when running this code, bundler would inject its own context # into them, messing things up heavily raise 'This code needs to be executed WITHOUT bundle exec' if Kernel.const_defined?(:Bundler) require 'open3' require 'fileutils' require 'pathname' require 'tmpdir' require 'etc' ROOT_PATH = Pathname.new(File.expand_path(File.join(File.dirname(__FILE__), '../'))) # How many child processes with integration specs do we want to run in parallel # When the value is high, there's a problem with thread allocation on Github CI, tht is why # we limit it. Locally we can run a lot of those, as many of them have sleeps and do not use a lot # of CPU CONCURRENCY = ENV.key?('CI') ? 5 : Etc.nprocessors * 2 # How may bytes do we want to keep from the stdout in the buffer for when we need to print it MAX_BUFFER_OUTPUT = 51_200 # Abstraction around a single test scenario execution process class Scenario # How long a scenario can run before we kill it # This is a fail-safe just in case something would hang MAX_RUN_TIME = 3 * 60 # 3 minutes tops # There are rare cases where Karafka may force shutdown for some of the integration cases # This includes exactly those EXIT_CODES = { default: [0], 'consumption/worker_critical_error_behaviour.rb' => [0, 2].freeze, 'shutdown/on_hanging_jobs_and_a_shutdown.rb' => [2].freeze, 'shutdown/on_hanging_on_shutdown_job_and_a_shutdown.rb' => [2].freeze, 'shutdown/on_hanging_listener_and_shutdown.rb' => [2].freeze }.freeze private_constant :MAX_RUN_TIME, :EXIT_CODES # Creates scenario instance and runs in the background process # # @param path [String] path to the scenarios file def initialize(path) @path = path # Last 1024 characters from stdout @stdout_tail = '' end # Starts running given scenario in a separate process def start @stdin, @stdout, @stderr, @wait_thr = Open3.popen3(init_and_build_cmd) @started_at = current_time end # @return [String] integration spec name def name @path.gsub("#{ROOT_PATH}/spec/integrations/", '') end # @return [Boolean] true if spec is pristine def pristine? scenario_dir = File.dirname(@path) # If there is a Gemfile in a scenario directory, it means it is a pristine spec and we need # to run bundle install, etc in order to run it File.exist?(File.join(scenario_dir, 'Gemfile')) end # @return [Boolean] did this scenario finished or is it still running def finished? # If the thread is running too long, kill it if current_time - @started_at > MAX_RUN_TIME @wait_thr.kill begin Process.kill('TERM', pid) # It may finish right after we want to kill it, that's why we ignore this rescue Errno::ESRCH end end # We read it so it won't grow as we use our default logger that prints to both test.log and # to stdout. Otherwise after reaching the buffer size, it would hang buffer = '' @stdout.read_nonblock(MAX_BUFFER_OUTPUT, buffer, exception: false) @stdout_tail << buffer @stdout_tail = @stdout_tail[-MAX_BUFFER_OUTPUT..-1] || @stdout_tail !@wait_thr.alive? end # @return [Boolean] did this scenario finish successfully or not def success? expected_exit_codes = EXIT_CODES[name] || EXIT_CODES[:default] expected_exit_codes.include?(exit_code) end # @return [Integer] pid of the process of this scenario def pid @wait_thr.pid end # @return [Integer] exit code of the process running given scenario def exit_code # There may be no exit status if we killed the thread @wait_thr.value&.exitstatus || 123 end # Prints a status report when scenario is finished and stdout if it failed def report if success? print "\e[#{32}m#{'.'}\e[0m" else buffer = '' @stderr.read_nonblock(MAX_BUFFER_OUTPUT, buffer, exception: false) puts puts "\e[#{31}m#{'[FAILED]'}\e[0m #{name}" puts "Exit code: #{exit_code}" puts @stdout_tail puts buffer puts end end private # Sets up a proper environment for a given spec to run and returns the run command # @return [String] run command def init_and_build_cmd # If there is a Gemfile in a scenario directory, it means it is a pristine spec and we need # to run bundle install, etc in order to run it if pristine? scenario_dir = File.dirname(@path) # We copy the spec into a temp dir, not to pollute the spec location with logs, etc temp_dir = Dir.mktmpdir file_name = File.basename(@path) FileUtils.cp_r("#{scenario_dir}/.", temp_dir) <<~CMD cd #{temp_dir} && KARAFKA_GEM_DIR=#{ROOT_PATH} \ BUNDLE_AUTO_INSTALL=true \ PRISTINE_MODE=true \ bundle exec ruby -r #{ROOT_PATH}/spec/integrations_helper.rb #{file_name} CMD else <<~CMD KARAFKA_GEM_DIR=#{ROOT_PATH} \ bundle exec ruby -r ./spec/integrations_helper.rb #{@path} CMD end end # @return [Float] current machine time def current_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end end # Load all the specs specs = Dir[ROOT_PATH.join('spec/integrations/**/*.rb')] # If filters is provided, apply # Allows to provide several filters one after another and applies all of them ARGV.each do |filter| specs.delete_if { |name| !name.include?(filter) } end raise ArgumentError, "No integration specs with filters: #{ARGV.join(', ')}" if specs.empty? # Randomize order seed = (ENV['SEED'] || rand(0..10_000)).to_i puts "Random seed: #{seed}" scenarios = specs .shuffle(random: Random.new(seed)) .map { |integration_test| Scenario.new(integration_test) } regulars = scenarios.reject(&:pristine?) pristine = scenarios.select(&:pristine?) active_scenarios = [] finished_scenarios = [] while finished_scenarios.size < scenarios.size # If we have space to run another scenario, we add it if active_scenarios.size < CONCURRENCY scenario = nil # We can run only one pristine at the same time due to concurrency issues within bundler # Since they usually take longer than others, we try to run them as fast as possible when there # is a slot scenario = pristine.pop unless active_scenarios.any?(&:pristine?) scenario ||= regulars.pop if scenario scenario.start active_scenarios << scenario end end active_scenarios.select(&:finished?).each do |exited| scenario = active_scenarios.delete(exited) scenario.report finished_scenarios << scenario end sleep(0.1) end failed_scenarios = finished_scenarios.reject(&:success?) # Report once more on the failed jobs # This will only list scenarios that failed without printing their stdout here. if failed_scenarios.empty? puts else puts "\nFailed scenarios:\n\n" failed_scenarios.each do |scenario| puts "\e[#{31}m#{'[FAILED]'}\e[0m #{scenario.name}" end puts # Exit with 1 if not all scenarios were successful exit 1 end