lib/kontena/cli/helpers/exec_helper.rb in kontena-cli-1.4.0.pre1 vs lib/kontena/cli/helpers/exec_helper.rb in kontena-cli-1.4.0.pre2

- old
+ new

@@ -1,87 +1,190 @@ -require_relative '../../websocket/client' +require 'io/console' +require 'kontena-websocket-client' module Kontena::Cli::Helpers module ExecHelper - # @param [WebSocket::Client::Simple] ws - # @return [Thread] - def stream_stdin_to_ws(ws, tty: nil) - require 'io/console' - Thread.new { - if tty - STDIN.raw { - while char = STDIN.readpartial(1024) - ws.text(JSON.dump({ stdin: char })) - end - } - else - while char = STDIN.gets - ws.text(JSON.dump({ stdin: char })) + websocket_log_level = if ENV["DEBUG"] == 'websocket' + Logger::DEBUG + elsif ENV["DEBUG"] + Logger::INFO + else + Logger::WARN + end + + Kontena::Websocket::Logging.initialize_logger(STDERR, websocket_log_level) + + WEBSOCKET_CLIENT_OPTIONS = { + connect_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0, + open_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0, + ping_interval: ENV["EXCON_READ_TIMEOUT"] ? ENV["EXCON_READ_TIMEOUT"].to_f : 30.0, + ping_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0, + close_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0, + write_timeout: ENV["EXCON_WRITE_TIMEOUT"] ? ENV["EXCON_WRITE_TIMEOUT"].to_f : 5.0, + } + + # @param ws [Kontena::Websocket::Client] + # @param tty [Boolean] read stdin in raw mode, sending tty escapes for remote pty + # @raise [ArgumentError] not a tty + # @yield [data] + # @yieldparam data [String] data from stdin + # @raise [ArgumentError] not a tty + # @return EOF on stdin (!tty) + def read_stdin(tty: nil) + if tty + raise ArgumentError, "the input device is not a TTY" unless STDIN.tty? + + STDIN.raw { |io| + # we do not expect EOF on a TTY, ^D sends a tty escape to close the pty instead + loop do + # raises EOFError, SyscallError or IOError + yield io.readpartial(1024) end - ws.text(JSON.dump({ stdin: nil })) + } + else + # line-buffered + while line = STDIN.gets + yield line end - } + end end - # @param [Hash] msg - def handle_message(msg) - if msg.has_key?('exit') - if msg['message'] - exit_with_error(msg['message']) - else - exit msg['exit'].to_i + # @return [String] + def websocket_url(path, query = nil) + url = URI.parse(require_current_master.url) + url.scheme = url.scheme.sub('http', 'ws') + url.path = '/v1/' + path + url.query = (query && !query.empty?) ? URI.encode_www_form(query) : nil + url.to_s + end + + # @param ws [Kontena::Websocket::Client] + # @return [Integer] exit code + def websocket_exec_read(ws) + ws.read do |msg| + msg = JSON.parse(msg) + + logger.debug "websocket exec read: #{msg.inspect}" + + if msg.has_key?('exit') + # breaks the read loop + return msg['exit'].to_i + elsif msg.has_key?('stream') + if msg['stream'] == 'stdout' + $stdout << msg['chunk'] + else + $stderr << msg['chunk'] + end end - elsif msg.has_key?('stream') - if msg['stream'] == 'stdout' - $stdout << msg['chunk'] - else - $stderr << msg['chunk'] - end end end - # @param [Websocket::Frame::Incoming] msg - def parse_message(msg) - JSON.parse(msg.data) - rescue JSON::ParserError - nil + # @param ws [Kontena::Websocket::Client] + # @param msg [Hash] + def websocket_exec_write(ws, msg) + logger.debug "websocket exec write: #{msg.inspect}" + + ws.send(JSON.dump(msg)) end - # @param container_id [String] The container id + # Start thread to read from stdin, and write to websocket. + # Closes websocket on stdin read errors. + # + # @param ws [Kontena::Websocket::Client] + # @param tty [Boolean] + # @return [Thread] + def websocket_exec_write_thread(ws, tty: nil) + Thread.new do + begin + read_stdin(tty: tty) do |stdin| + websocket_exec_write(ws, 'stdin' => stdin) + end + websocket_exec_write(ws, 'stdin' => nil) # EOF + rescue => exc + logger.error exc + ws.close(1001, "stdin read #{exc.class}: #{exc}") + end + end + end + + # Connect to server websocket, send from stdin, and write out messages + # + # @param paths [String] + # @param options [Hash] @see Kontena::Websocket::Client + # @param cmd [Array<String>] command to execute # @param interactive [Boolean] Interactive TTY on/off # @param shell [Boolean] Shell on/of # @param tty [Boolean] TTY on/of - # @return [String] - def ws_url(container_id, interactive: false, shell: false, tty: false) - require 'uri' unless Object.const_defined?(:URI) - extend Kontena::Cli::Common unless self.respond_to?(:require_current_master) + # @return [Integer] exit code + def websocket_exec(path, cmd, interactive: false, shell: false, tty: false) + exit_status = nil + write_thread = nil - url = URI.parse(require_current_master.url) - url.scheme = url.scheme.sub('http', 'ws') - url.path = "/v1/containers/#{container_id}/exec" - if shell || interactive || tty - query = {} - query.merge!(interactive: true) if interactive - query.merge!(shell: true) if shell - query.merge!(tty: true) if tty - url.query = URI.encode_www_form(query) - end - url.to_s - end + query = {} + query[:interactive] = interactive if interactive + query[:shell] = shell if shell + query[:tty] = tty if tty - # @param [String] url - # @param [String] token - # @return [WebSocket::Client::Simple] - def connect(url, token) - options = { - headers: { + server = require_current_master + url = websocket_url(path, query) + token = require_token + options = WEBSOCKET_CLIENT_OPTIONS.dup + options[:headers] = { 'Authorization' => "Bearer #{token.access_token}" - } } - if ENV['SSL_IGNORE_ERRORS'].to_s == 'true' - options[:verify_mode] = ::OpenSSL::SSL::VERIFY_NONE + options[:ssl_params] = { + verify_mode: ENV['SSL_IGNORE_ERRORS'].to_s == 'true' ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER, + ca_file: server.ssl_cert_path, + } + options[:ssl_hostname] = server.ssl_subject_cn + + logger.debug { "websocket exec connect... #{url}" } + + # we do not expect CloseError, because the server will send an 'exit' message first, + # and we return before seeing the close frame + # TODO: handle HTTP 404 errors + Kontena::Websocket::Client.connect(url, **options) do |ws| + logger.debug { "websocket exec open" } + + # first frame contains exec command + websocket_exec_write(ws, 'cmd' => cmd) + + if interactive + # start new thread to write from stdin to websocket + write_thread = websocket_exec_write_thread(ws, tty: tty) + end + + # blocks reading from websocket, returns with exec exit code + exit_status = websocket_exec_read(ws) + + fail ws.close_reason unless exit_status end - Kontena::Websocket::Client.new(url, options) + + rescue Kontena::Websocket::Error => exc + exit_with_error(exc) + + rescue => exc + logger.error { "websocket exec error: #{exc}" } + raise + + else + logger.debug { "websocket exec exit: #{exit_status}"} + return exit_status + + ensure + if write_thread + write_thread.kill + write_thread.join + end + end + + # Execute command on container using websocket API. + # + # @param id [String] Container ID (grid/host/name) + # @param cmd [Array<String>] command to execute + # @return [Integer] exit code + def container_exec(id, cmd, **exec_options) + websocket_exec("containers/#{id}/exec", cmd, **exec_options) end end end