lib/tty/command/process_runner.rb in tty-command-0.4.0 vs lib/tty/command/process_runner.rb in tty-command-0.5.0

- old
+ new

@@ -9,87 +9,153 @@ module TTY class Command class ProcessRunner include Execute + # the command to be spawned + attr_reader :cmd + # Initialize a Runner object # # @param [Printer] printer # the printer to use for logging # # @api private - def initialize(printer) + def initialize(cmd, printer) + @cmd = cmd + @timeout = cmd.options[:timeout] + @input = cmd.options[:input] + @signal = cmd.options[:signal] || :TERM @printer = printer + @threads = [] + @lock = Mutex.new end # Execute child process # @api public - def run(cmd) - timeout = cmd.options[:timeout] + def run!(&block) @printer.print_command_start(cmd) start = Time.now + runtime = 0.0 - spawn(cmd) do |pid, stdin, stdout, stderr| - stdout_data, stderr_data = read_streams(cmd, stdout, stderr) + pid, stdin, stdout, stderr = spawn(cmd) # do |pid, stdin, stdout, stderr| - runtime = Time.now - start - handle_timeout(timeout, runtime, pid) - status = waitpid(pid) + # write and read streams + write_stream(stdin) + stdout_data, stderr_data = read_streams(stdout, stderr, &block) - @printer.print_command_exit(cmd, status, runtime) + status = waitpid(pid) + runtime = Time.now - start - Result.new(status, stdout_data, stderr_data) - end + @printer.print_command_exit(cmd, status, runtime) + + Result.new(status, stdout_data, stderr_data) + rescue + terminate(pid) + Result.new(-1, stdout_data, stderr_data) + ensure + [stdin, stdout, stderr].each { |fd| fd.close if fd && !fd.closed? } end + # Stop a process marked by pid + # + # @param [Integer] pid + # + # @api public + def terminate(pid) + ::Process.kill(@signal, pid) + end + private # @api private - def handle_timeout(timeout, runtime, pid) - return unless timeout + def handle_timeout(runtime) + return unless @timeout - t = timeout - runtime - if t < 0.0 - ::Process.kill(:KILL, pid) + t = @timeout - runtime + raise TimeoutExceeded if t < 0.0 + end + + # @api private + def write_stream(stdin) + return unless @input + writers = [stdin] + start = Time.now + + # wait when ready for writing to pipe + _, writable = IO.select(nil, writers, writers, @timeout) + raise TimeoutExceeded if writable.nil? + + while writers.any? + writable.each do |fd| + begin + err = nil + size = fd.write(@input) + @input = @input.byteslice(size..-1) + rescue Errno::EPIPE => err + end + if err || @input.bytesize == 0 + writers.delete(stdin) + end + + # control total time spent writing + runtime = Time.now - start + handle_timeout(runtime) + end end end + # Read stdout & stderr streams in the background + # + # @param [IO] stdout + # @param [IO] stderr + # # @api private - def read_streams(cmd, stdout, stderr) + def read_streams(stdout, stderr, &block) stdout_data = '' stderr_data = Truncator.new - timeout = cmd.options[:timeout] - stdout_thread = Thread.new do - begin - while (line = stdout.gets) - stdout_data << line - @printer.print_command_out_data(cmd, line) - end - rescue TimeoutExceeded - stdout.close + print_out = -> (cmd, line) { @printer.print_command_out_data(cmd, line) } + print_err = -> (cmd, line) { @printer.print_command_err_data(cmd, line) } + + stdout_yield = -> (line) { block.(line, nil) if block } + stderr_yield = -> (line) { block.(nil, line) if block } + + @threads << read_stream(stdout, stdout_data, print_out, stdout_yield) + @threads << read_stream(stderr, stderr_data, print_err, stderr_yield) + + @threads.each do |th| + result = th.join(@timeout) + if result.nil? + @threads[0].raise + @threads[1].raise end end - stderr_thread = Thread.new do + [stdout_data, stderr_data.read] + end + + def read_stream(stream, data, print_callback, callback) + Thread.new do + Thread.current[:cmd_start] = Time.now begin - while (line = stderr.gets) - stderr_data << line - @printer.print_command_err_data(cmd, line) + while (line = stream.gets) + @lock.synchronize do + data << line + callback.(line) + print_callback.(cmd, line) + end + + # control total time spent reading + runtime = Time.now - Thread.current[:cmd_start] + handle_timeout(runtime) end - rescue TimeoutExceeded - stderr.close + rescue => err + raise err + ensure + stream.close end end - - [stdout_thread, stderr_thread].each do |th| - result = th.join(timeout) - if result.nil? - stdout_thread.raise(TimeoutExceeded) - stderr_thread.raise(TimeoutExceeded) - end - end - [stdout_data, stderr_data.read] end # @api private def waitpid(pid) ::Process.waitpid(pid, Process::WUNTRACED)