require "posix/spawn" require "thread" require "cc/analyzer/container/result" module CC module Analyzer # # Running an abstract docker container # # Input: # - image # - name # - command (Optional) # # Output: # - Result # - exit_status # - timed_out? # - duration # - maximum_output_exceeded? # - output_byte_count # - stderr # # Never raises (unless broken) # class Container DEFAULT_TIMEOUT = 15 * 60 # 15m DEFAULT_MAXIMUM_OUTPUT_BYTES = 500_000_000 def initialize(image:, name:, command: nil) @image = image @name = name @command = command @timed_out = false @maximum_output_exceeded = false @stdout_io = StringIO.new @stderr_io = StringIO.new @output_byte_count = 0 @counter_mutex = Mutex.new # By default accumulate and include stdout in result @output_delimeter = "\n" @on_output = ->(output) { @stdout_io.puts(output) } end def on_output(delimeter = "\n", &block) @output_delimeter = delimeter @on_output = block end def run(options = []) started = Time.now command = docker_run_command(options) Analyzer.logger.debug("docker run: #{command.inspect}") pid, _, out, err = POSIX::Spawn.popen4(*command) @t_out = read_stdout(out) @t_err = read_stderr(err) t_timeout = timeout_thread # blocks until the engine stops. this is put in a thread so that we can # explicitly abort it as part of #stop. otherwise a run-away container # could still block here forever if the docker-kill/wait is not # successful. there may still be stdout in flight if it was being # produced more quickly than consumed. @t_wait = Thread.new { _, @status = Process.waitpid2(pid) } @t_wait.join # blocks until all readers are done. they're still governed by the # timeout thread at this point. if we hit the timeout while processing # output, the threads will be Thread#killed as part of #stop and this # will unblock with the correct value in @timed_out [@t_out, @t_err].each(&:join) duration = if @timed_out timeout * 1000 else ((Time.now - started) * 1000).round end Result.new( container_name: @name, duration: duration, exit_status: @status&.exitstatus, maximum_output_exceeded: @maximum_output_exceeded, output_byte_count: output_byte_count, stderr: @stderr_io.string, stdout: @stdout_io.string, timed_out: @timed_out, ) ensure kill_reader_threads t_timeout&.kill end def stop(message = nil) reap_running_container(message) kill_reader_threads kill_wait_thread end private attr_reader :output_byte_count, :counter_mutex def docker_run_command(options) [ "docker", "run", "--name", @name, options, @image, @command ].flatten.compact end def read_stdout(out) Thread.new do out.each_line(@output_delimeter) do |chunk| output = chunk.chomp(@output_delimeter) Analyzer.logger.debug("engine stdout: #{output}") @on_output.call(output) check_output_bytes(output.bytesize) end ensure out.close end end def read_stderr(err) Thread.new do err.each_line do |line| Analyzer.logger.debug("engine stderr: #{line.chomp}") @stderr_io.write(line) check_output_bytes(line.bytesize) end ensure err.close end end def timeout_thread Thread.new do # Doing one long `sleep timeout` seems to fail sometimes, so # we do a series of short timeouts before exiting start_time = Time.now loop do sleep 10 duration = Time.now - start_time break if duration >= timeout end @timed_out = true stop("timed out") end.run end def check_output_bytes(last_read_byte_count) counter_mutex.synchronize do @output_byte_count += last_read_byte_count end if output_byte_count > maximum_output_bytes @maximum_output_exceeded = true stop("maximum output exceeded") end end def kill_reader_threads @t_out&.kill @t_err&.kill end def kill_wait_thread @t_wait&.kill end def reap_running_container(message) Analyzer.logger.warn("killing container name=#{@name} message=#{message.inspect}") POSIX::Spawn::Child.new("docker", "kill", @name, timeout: 2.minutes) POSIX::Spawn::Child.new("docker", "wait", @name, timeout: 2.minutes) rescue POSIX::Spawn::TimeoutExceeded Analyzer.logger.error("unable to kill container name=#{@name} message=#{message.inspect}") Analyzer.statsd.increment("container.zombie") Analyzer.statsd.increment("container.zombie.#{metric_name}") if metric_name end def timeout ENV.fetch("CONTAINER_TIMEOUT_SECONDS", DEFAULT_TIMEOUT).to_i end def maximum_output_bytes ENV.fetch("CONTAINER_MAXIMUM_OUTPUT_BYTES", DEFAULT_MAXIMUM_OUTPUT_BYTES).to_i end def metric_name if /^cc-engines-(?[^-]+)-(?[^-]+)-/ =~ @name "engine.#{engine}.#{channel}" elsif /^builder-(?[^-]+)-/ =~ @name "builder.#{action}" end end end end end