lib/tty/command/process_runner.rb in tty-command-0.6.0 vs lib/tty/command/process_runner.rb in tty-command-0.7.0

- old
+ new

@@ -1,60 +1,72 @@ # encoding: utf-8 # frozen_string_literal: true require 'thread' -require_relative 'execute' +require_relative 'child_process' require_relative 'result' require_relative 'truncator' module TTY class Command class ProcessRunner - include Execute - # the command to be spawned attr_reader :cmd # Initialize a Runner object # # @param [Printer] printer # the printer to use for logging # # @api private - def initialize(cmd, printer) + def initialize(cmd, printer, &block) @cmd = cmd @timeout = cmd.options[:timeout] @input = cmd.options[:input] @signal = cmd.options[:signal] || :TERM @printer = printer - @threads = [] - @lock = Mutex.new + @block = block end # Execute child process + # + # Write the input if provided to the child's stdin and read + # the contents of both the stdout and stderr. + # + # If a block is provided then yield the stdout and stderr content + # as its being read. + # # @api public - def run!(&block) + def run! @printer.print_command_start(cmd) start = Time.now runtime = 0.0 - pid, stdin, stdout, stderr = spawn(cmd) + pid, stdin, stdout, stderr = ChildProcess.spawn(cmd) - # write and read streams - write_stream(stdin) - stdout_data, stderr_data = read_streams(stdout, stderr, &block) + # no input to write, close child's stdin pipe + stdin.close if (@input.nil? || @input.empty?) && !stdin.nil? + readers = [stdout, stderr] + writers = [@input && stdin].compact + + while writers.any? + ready_readers, ready_writers = IO.select(readers, writers, [], @timeout) + raise TimeoutExceeded if ready_readers.nil? || ready_writers.nil? + + write_stream(ready_writers, writers) + end + + stdout_data, stderr_data = read_streams(stdout, stderr) + status = waitpid(pid) runtime = Time.now - start @printer.print_command_exit(cmd, status, runtime) Result.new(status, stdout_data, stderr_data, runtime) - rescue - terminate(pid) - Result.new(-1, stdout_data, stderr_data) ensure [stdin, stdout, stderr].each { |fd| fd.close if fd && !fd.closed? } end # Stop a process marked by pid @@ -74,83 +86,82 @@ t = @timeout - runtime raise TimeoutExceeded if t < 0.0 end + # Write the input to the process stdin + # # @api private - def write_stream(stdin) - return unless @input - writers = [stdin] + def write_stream(ready_writers, writers) start = Time.now - - # wait when ready for writing to pipe - _, writable = IO.select(nil, writers, writers, @timeout) - raise TimeoutExceeded if writable.nil? - - while writers.any? - writable.each do |fd| - begin - err = nil - size = fd.write(@input) - @input = @input.byteslice(size..-1) - rescue Errno::EPIPE => err - end - if err || @input.bytesize == 0 - writers.delete(stdin) - end - - # control total time spent writing - runtime = Time.now - start - handle_timeout(runtime) + ready_writers.each do |fd| + begin + err = nil + size = fd.write(@input) + @input = @input.byteslice(size..-1) + rescue IO::WaitWritable + rescue Errno::EPIPE => err + # The pipe closed before all input written + # Probably process exited prematurely + fd.close + writers.delete(fd) end + if err || @input.bytesize == 0 + fd.close + writers.delete(fd) + end + + # control total time spent writing + runtime = Time.now - start + handle_timeout(runtime) end end # Read stdout & stderr streams in the background # # @param [IO] stdout # @param [IO] stderr # # @api private - def read_streams(stdout, stderr, &block) + def read_streams(stdout, stderr) stdout_data = [] stderr_data = Truncator.new - print_out = -> (cmd, line) { @printer.print_command_out_data(cmd, line) } - print_err = -> (cmd, line) { @printer.print_command_err_data(cmd, line) } + out_buffer = -> (line) { + stdout_data << line + @printer.print_command_out_data(cmd, line) + @block.(line, nil) if @block + } - stdout_yield = -> (line) { block.(line, nil) if block } - stderr_yield = -> (line) { block.(nil, line) if block } + err_buffer = -> (line) { + stderr_data << line + @printer.print_command_err_data(cmd, line) + @block.(nil, line) if @block + } - @threads << read_stream(stdout, stdout_data, print_out, stdout_yield) - @threads << read_stream(stderr, stderr_data, print_err, stderr_yield) + stdout_thread = read_stream(stdout, out_buffer) + stderr_thread = read_stream(stderr, err_buffer) - @threads.each do |th| - result = th.join(@timeout) - if result.nil? - @threads[0].raise - @threads[1].raise - end - end + stdout_thread.join + stderr_thread.join [stdout_data.join, stderr_data.read] end - def read_stream(stream, data, print_callback, callback) + def read_stream(stream, buffer) Thread.new do Thread.current[:cmd_start] = Time.now begin while (line = stream.gets) - @lock.synchronize do - data << line - callback.(line) - print_callback.(cmd, line) - end + buffer.(line) # control total time spent reading runtime = Time.now - Thread.current[:cmd_start] handle_timeout(runtime) end + rescue Errno::EIO + # GNU/Linux `gets` raises when PTY slave is closed + nil rescue => err raise err ensure stream.close end