lib/cosmos/io/json_drb.rb in cosmos-3.5.1 vs lib/cosmos/io/json_drb.rb in cosmos-3.5.2

- old
+ new

@@ -1,376 +1,376 @@ -# encoding: ascii-8bit - -# Copyright 2014 Ball Aerospace & Technologies Corp. -# All Rights Reserved. -# -# This program is free software; you can modify and/or redistribute it -# under the terms of the GNU Lesser General Public License -# as published by the Free Software Foundation; version 3 with -# attribution addendums as found in the LICENSE.txt - -require 'thread' -require 'socket' -require 'json' -require 'drb/acl' -require 'drb/drb' -require 'set' -require 'cosmos/io/json_rpc' - -module Cosmos - - # JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface - # for both internal and external tools to access the COSMOS server. It - # provides methods to install an access control list to control access to the - # API. It also limits the available methods to a known list of allowable API - # methods. - class JsonDRb - MINIMUM_REQUEST_TIME = 0.0001 - - @@debug = false - - # @return [Integer] The number of JSON-RPC requests processed - attr_accessor :request_count - # @return [Array<String>] List of methods that should be allowed - attr_accessor :method_whitelist - # @return [ACL] The access control list - attr_accessor :acl - - def initialize - @listen_socket = nil - @thread = nil - @acl = nil - @object = nil - @method_whitelist = nil - @request_count = 0 - @request_times = [] - @request_times_index = 0 - @request_mutex = Mutex.new - @client_sockets = [] - @client_threads = [] - @client_mutex = Mutex.new - @thread_reader, @thread_writer = IO.pipe - end - - # Returns the number of connected clients - # @return [Integer] The number of connected clients - def num_clients - @client_threads.length - end - - # Stops the DRb service by closing the socket and the processing thread - def stop_service - Cosmos.kill_thread(self, @thread) - @thread = nil - Cosmos.close_socket(@listen_socket) - @listen_socket = nil - client_threads = nil - @client_mutex.synchronize do - @client_sockets.each do |client_socket| - Cosmos.close_socket(client_socket) - end - client_threads = @client_threads.clone - end - - # This cannot be inside of the client_mutex or the threads will not - # be able to shutdown because they will stick on the client_mutex - client_threads.each do |client_thread| - Cosmos.kill_thread(self, client_thread) - end - - @client_mutex.synchronize do - @client_threads.clear - @client_sockets.clear - end - end - - # Gracefully kill the thread - def graceful_kill - @thread_writer.write('.') if @thread - end - - # @param hostname [String] The host to start the service on - # @param port [Integer] The port number to listen for connections - # @param object [Object] The object to send the DRb requests to. This - # object must either include the Cosmos::Script module or be the - # CmdTlmServer. - def start_service(hostname = nil, port = nil, object = nil) - if hostname and port and object - @object = object - hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze) - - # Create a socket to accept connections from clients - begin - @listen_socket = TCPServer.new(hostname, port) - @listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows? - # The address is use error is pretty typical if an existing - # CmdTlmServer is running so explicitly rescue this - rescue Errno::EADDRINUSE - raise "Error binding to port #{port}.\n" + - "Either another application is using this port\n" + - "or the operating system is being slow cleaning up.\n" + - "Make sure all sockets/streams are closed in all applications,\n" + - "wait 1 minute and try again." - # Something else went wrong which is fatal - rescue => error - Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}" - Cosmos.handle_fatal_exception(error) - end - - # Start the listen thread which accepts connections - @thread = Thread.new do - begin - while true - begin - socket = @listen_socket.accept_nonblock - rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK - read_ready, _ = IO.select([@listen_socket, @thread_reader]) - if read_ready and read_ready.include?(@thread_reader) - begin - # Thread should be killed - Cleanout thread_reader first - # Don't let this break anything else though - @thread_reader.read(1) - rescue Exception - # Oh well - create a clean pipe in case we need one - @thread_reader, @thread_writer = IO.pipe - end - break - else - retry - end - end - - if @acl and !@acl.allow_socket?(socket) - Cosmos.close_socket(socket) - next - end - # Create new thread for new connection - create_client_thread(socket) - end - rescue Exception => error - Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}" - Cosmos.handle_fatal_exception(error) - end - end - elsif hostname or port or object - raise "0 or 3 parameters must be given" - else - # Client - Noop - end - end - - # @return [Thread] The server thread listening for incoming requests - def thread - @thread - end - - # Adds a request time to the list. A request time consists of the amount of - # time to receive the request, process it, and send the response. These - # times are used by the {#average_request_time} method to calculate an - # average request time. - # - # @param request_time [Float] Time in seconds for the data transmission - def add_request_time(request_time) - @request_mutex.synchronize do - request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME - @request_times[@request_times_index] = request_time - @request_times_index += 1 - @request_times_index = 0 if @request_times_index >= 100 - end - end - - # @return [Float] The average time in seconds for a JSON DRb request to be - # processed and the response sent. - def average_request_time - avg = 0 - @request_mutex.synchronize do - avg = @request_times.mean - end - avg - end - - # @param socket [Socket] The socket to the client - # @param data [String] Binary data which has already been read from the - # socket. - # @return [String] The request message - def self.receive_message(socket, data) - self.get_at_least_x_bytes_of_data(socket, data, 4) - if data.length >= 4 - length = data[0..3].unpack('N'.freeze)[0] - data.replace(data[4..-1]) - else - return nil - end - - self.get_at_least_x_bytes_of_data(socket, data, length) - if data.length >= length - message = data[0..(length - 1)] - data.replace(data[length..-1]) - return message - else - return nil - end - end - - # @param socket [Socket] The socket to the client - # @param current_data [String] Binary data read from the socket - # @param required_num_bytes [Integer] The minimum number of bytes to read - # before returning - def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes) - while (current_data.length < required_num_bytes) - begin - data = socket.recv_nonblock(65535) - if data.length == 0 - current_data.replace('') - return - end - current_data << data - rescue IO::WaitReadable - IO.fast_select([socket], nil, nil, nil) - retry - end - end - end - - # @param socket [Socket] The socket to the client - # @param data [String] Binary data to send to the socket - # @param send_timeout [Float] The number of seconds to wait for the send to - # complete - def self.send_data(socket, data, send_timeout = 10.0) - num_bytes_to_send = data.length + 4 - total_bytes_sent = 0 - bytes_sent = 0 - data_to_send = [data.length].pack('N'.freeze) << data.clone - - loop do - begin - bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1]) - rescue Errno::EAGAIN, Errno::EWOULDBLOCK - result = IO.fast_select(nil, [socket], nil, send_timeout) - if result - retry - else - raise Timeout::Error, "Send Timeout" - end - end - total_bytes_sent += bytes_sent - break if total_bytes_sent >= num_bytes_to_send - end - end - - # @return [Boolean] Whether debug messages are enabled - def self.debug? - @@debug - end - - # @param value [Boolean] Whether to enable debug messages - def self.debug=(value) - @@debug = value - end - - protected - - # Creates a new Thread to service the JSON DRb requests from the client. - # - # @param socket [Socket] The socket which the server accepted from the - # client. - def create_client_thread(socket) - socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) - socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1) - - Thread.new(socket) do |my_socket| - @client_mutex.synchronize do - @client_sockets << my_socket - @client_threads << Thread.current - end - - data = '' - begin - while true - begin - request_data = JsonDRb.receive_message(my_socket, data) - start_time = Time.now - @request_count += 1 - rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ENOTSOCK - # Socket was closed - break - end - if request_data - break unless process_request(request_data, my_socket, start_time) - else - # Socket was closed by client - break - end - end - rescue Exception => error - Logger.error "JsonDrb client thread unexpectedly died.\n#{error.formatted}" - end - - @client_mutex.synchronize do - Cosmos.close_socket(my_socket) - @client_sockets.delete(my_socket) - @client_threads.delete(Thread.current) - end - end - end - - # Process the JSON request data, execute the method, and send the response. - # - # @param request_data [String] The JSON encoded request - # @param my_socket [Socket] The socket to send the response out on - # @param start_time [Time] The time when the initial request was received - def process_request(request_data, my_socket, start_time) - STDOUT.puts request_data if JsonDRb.debug? - begin - request = JsonRpcRequest.from_json(request_data) - response = nil - - if (@method_whitelist and @method_whitelist.include?(request.method)) or - (!@method_whitelist and !JsonRpcRequest::DANGEROUS_METHODS.include?(request.method)) - begin - result = @object.send(request.method.intern, *request.params) - if request.id - response = JsonRpcSuccessResponse.new(result, request.id) - end - rescue Exception => error - if request.id - if NoMethodError === error - response = JsonRpcErrorResponse.new( - JsonRpcError.new(-32601, "Method not found", error), request.id) - elsif ArgumentError === error - response = JsonRpcErrorResponse.new( - JsonRpcError.new(-32602, "Invalid params", error), request.id) - else - response = JsonRpcErrorResponse.new( - JsonRpcError.new(-1, error.message, error), request.id) - end - end - end - else - if request.id - response = JsonRpcErrorResponse.new( - JsonRpcError.new(-1, "Cannot call unauthorized methods"), request.id) - end - end - process_response(response, my_socket, start_time) if response - rescue => error - response = JsonRpcErrorResponse.new(JsonRpcError.new(-32600, "Invalid Request", error), nil) - process_response(response, my_socket, start_time) - end - true - end - - def process_response(response, socket, start_time) - response_data = response.to_json(:allow_nan => true) - STDOUT.puts response_data if JsonDRb.debug? - JsonDRb.send_data(socket, response_data) - end_time = Time.now - request_time = end_time - start_time - add_request_time(request_time) - rescue - # Socket was closed? - return false - end - - end -end - +# encoding: ascii-8bit + +# Copyright 2014 Ball Aerospace & Technologies Corp. +# All Rights Reserved. +# +# This program is free software; you can modify and/or redistribute it +# under the terms of the GNU Lesser General Public License +# as published by the Free Software Foundation; version 3 with +# attribution addendums as found in the LICENSE.txt + +require 'thread' +require 'socket' +require 'json' +require 'drb/acl' +require 'drb/drb' +require 'set' +require 'cosmos/io/json_rpc' + +module Cosmos + + # JsonDRb implements the JSON-RPC 2.0 Specification to provide an interface + # for both internal and external tools to access the COSMOS server. It + # provides methods to install an access control list to control access to the + # API. It also limits the available methods to a known list of allowable API + # methods. + class JsonDRb + MINIMUM_REQUEST_TIME = 0.0001 + + @@debug = false + + # @return [Integer] The number of JSON-RPC requests processed + attr_accessor :request_count + # @return [Array<String>] List of methods that should be allowed + attr_accessor :method_whitelist + # @return [ACL] The access control list + attr_accessor :acl + + def initialize + @listen_socket = nil + @thread = nil + @acl = nil + @object = nil + @method_whitelist = nil + @request_count = 0 + @request_times = [] + @request_times_index = 0 + @request_mutex = Mutex.new + @client_sockets = [] + @client_threads = [] + @client_mutex = Mutex.new + @thread_reader, @thread_writer = IO.pipe + end + + # Returns the number of connected clients + # @return [Integer] The number of connected clients + def num_clients + @client_threads.length + end + + # Stops the DRb service by closing the socket and the processing thread + def stop_service + Cosmos.kill_thread(self, @thread) + @thread = nil + Cosmos.close_socket(@listen_socket) + @listen_socket = nil + client_threads = nil + @client_mutex.synchronize do + @client_sockets.each do |client_socket| + Cosmos.close_socket(client_socket) + end + client_threads = @client_threads.clone + end + + # This cannot be inside of the client_mutex or the threads will not + # be able to shutdown because they will stick on the client_mutex + client_threads.each do |client_thread| + Cosmos.kill_thread(self, client_thread) + end + + @client_mutex.synchronize do + @client_threads.clear + @client_sockets.clear + end + end + + # Gracefully kill the thread + def graceful_kill + @thread_writer.write('.') if @thread + end + + # @param hostname [String] The host to start the service on + # @param port [Integer] The port number to listen for connections + # @param object [Object] The object to send the DRb requests to. This + # object must either include the Cosmos::Script module or be the + # CmdTlmServer. + def start_service(hostname = nil, port = nil, object = nil) + if hostname and port and object + @object = object + hostname = '127.0.0.1'.freeze if (hostname.to_s.upcase == 'LOCALHOST'.freeze) + + # Create a socket to accept connections from clients + begin + @listen_socket = TCPServer.new(hostname, port) + @listen_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, 1) unless Kernel.is_windows? + # The address is use error is pretty typical if an existing + # CmdTlmServer is running so explicitly rescue this + rescue Errno::EADDRINUSE + raise "Error binding to port #{port}.\n" + + "Either another application is using this port\n" + + "or the operating system is being slow cleaning up.\n" + + "Make sure all sockets/streams are closed in all applications,\n" + + "wait 1 minute and try again." + # Something else went wrong which is fatal + rescue => error + Logger.error "JsonDRb listen thread unable to be created.\n#{error.formatted}" + Cosmos.handle_fatal_exception(error) + end + + # Start the listen thread which accepts connections + @thread = Thread.new do + begin + while true + begin + socket = @listen_socket.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR, Errno::EWOULDBLOCK + read_ready, _ = IO.select([@listen_socket, @thread_reader]) + if read_ready and read_ready.include?(@thread_reader) + begin + # Thread should be killed - Cleanout thread_reader first + # Don't let this break anything else though + @thread_reader.read(1) + rescue Exception + # Oh well - create a clean pipe in case we need one + @thread_reader, @thread_writer = IO.pipe + end + break + else + retry + end + end + + if @acl and !@acl.allow_socket?(socket) + Cosmos.close_socket(socket) + next + end + # Create new thread for new connection + create_client_thread(socket) + end + rescue Exception => error + Logger.error "JsonDRb listen thread unexpectedly died.\n#{error.formatted}" + Cosmos.handle_fatal_exception(error) + end + end + elsif hostname or port or object + raise "0 or 3 parameters must be given" + else + # Client - Noop + end + end + + # @return [Thread] The server thread listening for incoming requests + def thread + @thread + end + + # Adds a request time to the list. A request time consists of the amount of + # time to receive the request, process it, and send the response. These + # times are used by the {#average_request_time} method to calculate an + # average request time. + # + # @param request_time [Float] Time in seconds for the data transmission + def add_request_time(request_time) + @request_mutex.synchronize do + request_time = MINIMUM_REQUEST_TIME if request_time < MINIMUM_REQUEST_TIME + @request_times[@request_times_index] = request_time + @request_times_index += 1 + @request_times_index = 0 if @request_times_index >= 100 + end + end + + # @return [Float] The average time in seconds for a JSON DRb request to be + # processed and the response sent. + def average_request_time + avg = 0 + @request_mutex.synchronize do + avg = @request_times.mean + end + avg + end + + # @param socket [Socket] The socket to the client + # @param data [String] Binary data which has already been read from the + # socket. + # @return [String] The request message + def self.receive_message(socket, data) + self.get_at_least_x_bytes_of_data(socket, data, 4) + if data.length >= 4 + length = data[0..3].unpack('N'.freeze)[0] + data.replace(data[4..-1]) + else + return nil + end + + self.get_at_least_x_bytes_of_data(socket, data, length) + if data.length >= length + message = data[0..(length - 1)] + data.replace(data[length..-1]) + return message + else + return nil + end + end + + # @param socket [Socket] The socket to the client + # @param current_data [String] Binary data read from the socket + # @param required_num_bytes [Integer] The minimum number of bytes to read + # before returning + def self.get_at_least_x_bytes_of_data(socket, current_data, required_num_bytes) + while (current_data.length < required_num_bytes) + begin + data = socket.recv_nonblock(65535) + if data.length == 0 + current_data.replace('') + return + end + current_data << data + rescue IO::WaitReadable + IO.fast_select([socket], nil, nil, nil) + retry + end + end + end + + # @param socket [Socket] The socket to the client + # @param data [String] Binary data to send to the socket + # @param send_timeout [Float] The number of seconds to wait for the send to + # complete + def self.send_data(socket, data, send_timeout = 10.0) + num_bytes_to_send = data.length + 4 + total_bytes_sent = 0 + bytes_sent = 0 + data_to_send = [data.length].pack('N'.freeze) << data.clone + + loop do + begin + bytes_sent = socket.write_nonblock(data_to_send[total_bytes_sent..-1]) + rescue Errno::EAGAIN, Errno::EWOULDBLOCK + result = IO.fast_select(nil, [socket], nil, send_timeout) + if result + retry + else + raise Timeout::Error, "Send Timeout" + end + end + total_bytes_sent += bytes_sent + break if total_bytes_sent >= num_bytes_to_send + end + end + + # @return [Boolean] Whether debug messages are enabled + def self.debug? + @@debug + end + + # @param value [Boolean] Whether to enable debug messages + def self.debug=(value) + @@debug = value + end + + protected + + # Creates a new Thread to service the JSON DRb requests from the client. + # + # @param socket [Socket] The socket which the server accepted from the + # client. + def create_client_thread(socket) + socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) + socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1) + + Thread.new(socket) do |my_socket| + @client_mutex.synchronize do + @client_sockets << my_socket + @client_threads << Thread.current + end + + data = '' + begin + while true + begin + request_data = JsonDRb.receive_message(my_socket, data) + start_time = Time.now + @request_count += 1 + rescue Errno::ECONNRESET, Errno::ECONNABORTED, Errno::ENOTSOCK + # Socket was closed + break + end + if request_data + break unless process_request(request_data, my_socket, start_time) + else + # Socket was closed by client + break + end + end + rescue Exception => error + Logger.error "JsonDrb client thread unexpectedly died.\n#{error.formatted}" + end + + @client_mutex.synchronize do + Cosmos.close_socket(my_socket) + @client_sockets.delete(my_socket) + @client_threads.delete(Thread.current) + end + end + end + + # Process the JSON request data, execute the method, and send the response. + # + # @param request_data [String] The JSON encoded request + # @param my_socket [Socket] The socket to send the response out on + # @param start_time [Time] The time when the initial request was received + def process_request(request_data, my_socket, start_time) + STDOUT.puts request_data if JsonDRb.debug? + begin + request = JsonRpcRequest.from_json(request_data) + response = nil + + if (@method_whitelist and @method_whitelist.include?(request.method)) or + (!@method_whitelist and !JsonRpcRequest::DANGEROUS_METHODS.include?(request.method)) + begin + result = @object.send(request.method.intern, *request.params) + if request.id + response = JsonRpcSuccessResponse.new(result, request.id) + end + rescue Exception => error + if request.id + if NoMethodError === error + response = JsonRpcErrorResponse.new( + JsonRpcError.new(-32601, "Method not found", error), request.id) + elsif ArgumentError === error + response = JsonRpcErrorResponse.new( + JsonRpcError.new(-32602, "Invalid params", error), request.id) + else + response = JsonRpcErrorResponse.new( + JsonRpcError.new(-1, error.message, error), request.id) + end + end + end + else + if request.id + response = JsonRpcErrorResponse.new( + JsonRpcError.new(-1, "Cannot call unauthorized methods"), request.id) + end + end + process_response(response, my_socket, start_time) if response + rescue => error + response = JsonRpcErrorResponse.new(JsonRpcError.new(-32600, "Invalid Request", error), nil) + process_response(response, my_socket, start_time) + end + true + end + + def process_response(response, socket, start_time) + response_data = response.to_json(:allow_nan => true) + STDOUT.puts response_data if JsonDRb.debug? + JsonDRb.send_data(socket, response_data) + end_time = Time.now + request_time = end_time - start_time + add_request_time(request_time) + rescue + # Socket was closed? + return false + end + + end +end +