lib/kumogata/post_processing.rb in kumogata-0.3.13 vs lib/kumogata/post_processing.rb in kumogata-0.3.14

- old
+ new

@@ -133,37 +133,51 @@ command = evaluate_command_template(command, outputs) connect_tries = (ssh['connect_tries'] || 36).to_i retry_interval = (ssh['retry_interval'] || 5).to_i + stderr_orig = nil + begin - retryable(:tries => connect_tries, :on => Net::SSH::Disconnect, :sleep => retry_interval) do - Net::SSH.start(*args) {|ssh| ssh_exec!(ssh, command) } + stderr_orig = STDERR.dup + STDERR.reopen('/dev/null', 'w') + + begin + retryable(:tries => connect_tries, :on => Net::SSH::Disconnect, :sleep => retry_interval) do + Net::SSH.start(*args) {|ssh| ssh_exec!(ssh, command) } + end + rescue Net::SSH::HostKeyMismatch => e + e.remember_host! + retry end - rescue Net::SSH::HostKeyMismatch => e - e.remember_host! - retry + ensure + STDERR.reopen(stderr_orig) end end def ssh_exec!(ssh, command) stdout_data = '' stderr_data = '' exit_code = nil #exit_signal = nil + stdout_stream = create_stdout_stream + stderr_stream = create_stderr_stream + ssh.open_channel do |channel| channel.exec(command) do |ch, success| unless success raise "Couldn't execute command #{command.inspect} (ssh.channel.exec)" end channel.on_data do |ch, data| + stdout_stream.push data stdout_data << data end channel.on_extended_data do |ch, type, data| + stderr_stream.push data stderr_data << data end channel.on_request('exit-status') do |ch, data| exit_code = data.read_long @@ -175,17 +189,61 @@ end end ssh.loop + stdout_stream.close + stderr_stream.close + #[stdout_data, stderr_data, exit_code, exit_signal] [stdout_data, stderr_data, exit_code] end def run_shell_command(command, outputs) command = evaluate_command_template(command, outputs) - Open3.capture3(command) + + stdout_data = '' + stderr_data = '' + exit_code = nil + + Open3.popen3(command) do |stdin, stdout, stderr, wait_thr| + mutex = Mutex.new + + th_out = Thread.start do + stdout_stream = create_stdout_stream + + stdout.each_line do |line| + mutex.synchronize do + stdout_stream.push line + end + + stdout_data << line + end + + stdout_stream.close + end + + th_err = Thread.start do + stderr_stream = create_stderr_stream + + stderr.each_line do |line| + mutex.synchronize do + stderr_stream.push line + end + stderr_data << line + end + + stderr_stream.close + end + + th_out.join + th_err.join + exit_code = wait_thr.value + end + + #[stdout_data, stderr_data, exit_code, exit_signal] + [stdout_data, stderr_data, exit_code] end def validate_command_template(name, command, outputs) command = command.undent if @command_options[:undent] trim_mode = @command_options[:trim_mode] @@ -230,20 +288,28 @@ Command: #{name.intense_blue} EOS end - def print_command_result(out, err, status) + def create_stdout_stream + Kumogata::StringStream.new do |line| + puts '1> '.intense_green + line + $stdout.flush + end + end + + def create_stderr_stream + Kumogata::StringStream.new do |line| + puts '2> '.intense_red + line + $stdout.flush + end + end + + def print_command_result(out, err, status) # XXX: status = status.to_i - dspout = (out || '').lines.map {|i| "1> ".intense_green + i }.join.chomp - dsperr = (err || '').lines.map {|i| "2> ".intense_red + i }.join.chomp puts <<-EOS -Status: #{status.zero? ? status : status.to_s.red}#{ - dspout.empty? ? '' : ("\n---\n" + dspout) -}#{ - dsperr.empty? ? '' : ("\n---\n" + dsperr) -} +Status: #{status.zero? ? status : status.to_s.red} EOS end def save_command_results(results) puts <<-EOS