lib/kontena/cli/helpers/exec_helper.rb in kontena-cli-1.4.0.pre1 vs lib/kontena/cli/helpers/exec_helper.rb in kontena-cli-1.4.0.pre2
- old
+ new
@@ -1,87 +1,190 @@
-require_relative '../../websocket/client'
+require 'io/console'
+require 'kontena-websocket-client'
module Kontena::Cli::Helpers
module ExecHelper
- # @param [WebSocket::Client::Simple] ws
- # @return [Thread]
- def stream_stdin_to_ws(ws, tty: nil)
- require 'io/console'
- Thread.new {
- if tty
- STDIN.raw {
- while char = STDIN.readpartial(1024)
- ws.text(JSON.dump({ stdin: char }))
- end
- }
- else
- while char = STDIN.gets
- ws.text(JSON.dump({ stdin: char }))
+ websocket_log_level = if ENV["DEBUG"] == 'websocket'
+ Logger::DEBUG
+ elsif ENV["DEBUG"]
+ Logger::INFO
+ else
+ Logger::WARN
+ end
+
+ Kontena::Websocket::Logging.initialize_logger(STDERR, websocket_log_level)
+
+ WEBSOCKET_CLIENT_OPTIONS = {
+ connect_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
+ open_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
+ ping_interval: ENV["EXCON_READ_TIMEOUT"] ? ENV["EXCON_READ_TIMEOUT"].to_f : 30.0,
+ ping_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
+ close_timeout: ENV["EXCON_CONNECT_TIMEOUT"] ? ENV["EXCON_CONNECT_TIMEOUT"].to_f : 5.0,
+ write_timeout: ENV["EXCON_WRITE_TIMEOUT"] ? ENV["EXCON_WRITE_TIMEOUT"].to_f : 5.0,
+ }
+
+ # @param ws [Kontena::Websocket::Client]
+ # @param tty [Boolean] read stdin in raw mode, sending tty escapes for remote pty
+ # @raise [ArgumentError] not a tty
+ # @yield [data]
+ # @yieldparam data [String] data from stdin
+ # @raise [ArgumentError] not a tty
+ # @return EOF on stdin (!tty)
+ def read_stdin(tty: nil)
+ if tty
+ raise ArgumentError, "the input device is not a TTY" unless STDIN.tty?
+
+ STDIN.raw { |io|
+ # we do not expect EOF on a TTY, ^D sends a tty escape to close the pty instead
+ loop do
+ # raises EOFError, SyscallError or IOError
+ yield io.readpartial(1024)
end
- ws.text(JSON.dump({ stdin: nil }))
+ }
+ else
+ # line-buffered
+ while line = STDIN.gets
+ yield line
end
- }
+ end
end
- # @param [Hash] msg
- def handle_message(msg)
- if msg.has_key?('exit')
- if msg['message']
- exit_with_error(msg['message'])
- else
- exit msg['exit'].to_i
+ # @return [String]
+ def websocket_url(path, query = nil)
+ url = URI.parse(require_current_master.url)
+ url.scheme = url.scheme.sub('http', 'ws')
+ url.path = '/v1/' + path
+ url.query = (query && !query.empty?) ? URI.encode_www_form(query) : nil
+ url.to_s
+ end
+
+ # @param ws [Kontena::Websocket::Client]
+ # @return [Integer] exit code
+ def websocket_exec_read(ws)
+ ws.read do |msg|
+ msg = JSON.parse(msg)
+
+ logger.debug "websocket exec read: #{msg.inspect}"
+
+ if msg.has_key?('exit')
+ # breaks the read loop
+ return msg['exit'].to_i
+ elsif msg.has_key?('stream')
+ if msg['stream'] == 'stdout'
+ $stdout << msg['chunk']
+ else
+ $stderr << msg['chunk']
+ end
end
- elsif msg.has_key?('stream')
- if msg['stream'] == 'stdout'
- $stdout << msg['chunk']
- else
- $stderr << msg['chunk']
- end
end
end
- # @param [Websocket::Frame::Incoming] msg
- def parse_message(msg)
- JSON.parse(msg.data)
- rescue JSON::ParserError
- nil
+ # @param ws [Kontena::Websocket::Client]
+ # @param msg [Hash]
+ def websocket_exec_write(ws, msg)
+ logger.debug "websocket exec write: #{msg.inspect}"
+
+ ws.send(JSON.dump(msg))
end
- # @param container_id [String] The container id
+ # Start thread to read from stdin, and write to websocket.
+ # Closes websocket on stdin read errors.
+ #
+ # @param ws [Kontena::Websocket::Client]
+ # @param tty [Boolean]
+ # @return [Thread]
+ def websocket_exec_write_thread(ws, tty: nil)
+ Thread.new do
+ begin
+ read_stdin(tty: tty) do |stdin|
+ websocket_exec_write(ws, 'stdin' => stdin)
+ end
+ websocket_exec_write(ws, 'stdin' => nil) # EOF
+ rescue => exc
+ logger.error exc
+ ws.close(1001, "stdin read #{exc.class}: #{exc}")
+ end
+ end
+ end
+
+ # Connect to server websocket, send from stdin, and write out messages
+ #
+ # @param paths [String]
+ # @param options [Hash] @see Kontena::Websocket::Client
+ # @param cmd [Array<String>] command to execute
# @param interactive [Boolean] Interactive TTY on/off
# @param shell [Boolean] Shell on/of
# @param tty [Boolean] TTY on/of
- # @return [String]
- def ws_url(container_id, interactive: false, shell: false, tty: false)
- require 'uri' unless Object.const_defined?(:URI)
- extend Kontena::Cli::Common unless self.respond_to?(:require_current_master)
+ # @return [Integer] exit code
+ def websocket_exec(path, cmd, interactive: false, shell: false, tty: false)
+ exit_status = nil
+ write_thread = nil
- url = URI.parse(require_current_master.url)
- url.scheme = url.scheme.sub('http', 'ws')
- url.path = "/v1/containers/#{container_id}/exec"
- if shell || interactive || tty
- query = {}
- query.merge!(interactive: true) if interactive
- query.merge!(shell: true) if shell
- query.merge!(tty: true) if tty
- url.query = URI.encode_www_form(query)
- end
- url.to_s
- end
+ query = {}
+ query[:interactive] = interactive if interactive
+ query[:shell] = shell if shell
+ query[:tty] = tty if tty
- # @param [String] url
- # @param [String] token
- # @return [WebSocket::Client::Simple]
- def connect(url, token)
- options = {
- headers: {
+ server = require_current_master
+ url = websocket_url(path, query)
+ token = require_token
+ options = WEBSOCKET_CLIENT_OPTIONS.dup
+ options[:headers] = {
'Authorization' => "Bearer #{token.access_token}"
- }
}
- if ENV['SSL_IGNORE_ERRORS'].to_s == 'true'
- options[:verify_mode] = ::OpenSSL::SSL::VERIFY_NONE
+ options[:ssl_params] = {
+ verify_mode: ENV['SSL_IGNORE_ERRORS'].to_s == 'true' ? OpenSSL::SSL::VERIFY_NONE : OpenSSL::SSL::VERIFY_PEER,
+ ca_file: server.ssl_cert_path,
+ }
+ options[:ssl_hostname] = server.ssl_subject_cn
+
+ logger.debug { "websocket exec connect... #{url}" }
+
+ # we do not expect CloseError, because the server will send an 'exit' message first,
+ # and we return before seeing the close frame
+ # TODO: handle HTTP 404 errors
+ Kontena::Websocket::Client.connect(url, **options) do |ws|
+ logger.debug { "websocket exec open" }
+
+ # first frame contains exec command
+ websocket_exec_write(ws, 'cmd' => cmd)
+
+ if interactive
+ # start new thread to write from stdin to websocket
+ write_thread = websocket_exec_write_thread(ws, tty: tty)
+ end
+
+ # blocks reading from websocket, returns with exec exit code
+ exit_status = websocket_exec_read(ws)
+
+ fail ws.close_reason unless exit_status
end
- Kontena::Websocket::Client.new(url, options)
+
+ rescue Kontena::Websocket::Error => exc
+ exit_with_error(exc)
+
+ rescue => exc
+ logger.error { "websocket exec error: #{exc}" }
+ raise
+
+ else
+ logger.debug { "websocket exec exit: #{exit_status}"}
+ return exit_status
+
+ ensure
+ if write_thread
+ write_thread.kill
+ write_thread.join
+ end
+ end
+
+ # Execute command on container using websocket API.
+ #
+ # @param id [String] Container ID (grid/host/name)
+ # @param cmd [Array<String>] command to execute
+ # @return [Integer] exit code
+ def container_exec(id, cmd, **exec_options)
+ websocket_exec("containers/#{id}/exec", cmd, **exec_options)
end
end
end