lib/arachni/rpc/em/server.rb in arachni-rpc-em-0.1.3 vs lib/arachni/rpc/em/server.rb in arachni-rpc-em-0.2

- old
+ new

@@ -8,175 +8,41 @@ module Arachni module RPC module EM +require_relative 'server/handler' + +# EventMachine-based RPC server. # -# EventMachine-based RPC server class. -# -# It's capable of: -# - performing and handling a few thousands requests per second (depending on call size, network conditions and the like) -# - TLS encryption -# - asynchronous and synchronous requests -# - handling asynchronous methods that require a block -# -# @author: Tasos "Zapotek" Laskos <tasos.laskos@gmail.com> -# +# @author Tasos "Zapotek" Laskos <tasos.laskos@gmail.com> class Server include ::Arachni::RPC::Exceptions - # - # Handles EventMachine's connection stuff. - # - # It's responsible for TLS, serializing, transmitting and receiving objects, - # as well as authenticating the client using the token. - # - # It also handles and forwards exceptions. - # - # @author: Tasos "Zapotek" Laskos <tasos.laskos@gmail.com> - # - class Proxy < EventMachine::Connection - include ::Arachni::RPC::EM::Protocol - include ::Arachni::RPC::Exceptions - include ::Arachni::RPC::EM::ConnectionUtilities - - INACTIVITY_TIMEOUT = 10 - - attr_reader :request - - def initialize( server ) - super - @server = server - @opts = server.opts - - assume_server_role! - - @id = nil - @request = nil - - # do not tolerate long periods of - # inactivity in order to avoid zombie connections - set_comm_inactivity_timeout( INACTIVITY_TIMEOUT ) - end - - # starts TLS - def post_init - start_ssl - end - - def unbind - end_ssl - end - - def log( severity, progname, msg ) - sev_sym = Logger.const_get( severity.to_s.upcase.to_sym ) - @server.logger.add( sev_sym, msg, progname ) - end - - # - # Handles requests and sends back the responses. - # - # @param [Arachni::RPC::EM::Request] req - # - def receive_request( req ) - @request = req - - # the method call may block a little so tell EventMachine to - # stick it in its own thread. - res = Response.new - peer = peer_ip_addr - - begin - # token-based authentication - authenticate! - - # grab the result of the method call - res.merge!( @server.call( self ) ) - - # handle exceptions and convert them to a simple hash, - # ready to be passed to the client. - rescue Exception => e - - type = '' - - # if it's an RPC exception pass the type along as is - if e.rpc_exception? - type = e.class.name.split( ':' )[-1] - # otherwise set it to a RemoteExeption - else - type = 'RemoteException' - end - - res.obj = { - 'exception' => e.to_s, - 'backtrace' => e.backtrace, - 'type' => type - } - - msg = "#{e.to_s}\n#{e.backtrace.join( "\n" )}" - @server.logger.error( 'Exception' ){ msg + " [on behalf of #{peer}]" } - end - - # - # pass the result of the RPC call back to the client - # along with the callback ID but *only* if it wan't async - # because server.call() will have already taken care of it - # - send_response( res ) if !res.async? - end - - # - # Authenticates the client based on the token in the request. - # - # It will raise an exception if the token doesn't check-out. - # - def authenticate! - if !valid_token?( @request.token ) - - msg = 'Token missing or invalid while calling: ' + @request.message - - @server.logger.error( 'Authenticator' ){ - msg + " [on behalf of #{peer_ip_addr}]" - } - - fail InvalidToken.new( msg ) - end - end - - # - # Compares the authentication token in the param with the one of the server. - # - # @param [String] token - # - # @return [Bool] - # - def valid_token?( token ) - token == @server.token - end - - end - + # @return [String] Authentication token. attr_reader :token + + # @return [Hash] Configuration options. attr_reader :opts + + # @return [Logger] attr_reader :logger # # Starts EventMachine and the RPC server. # - # opts example: + # @example Example options: # # { # :host => 'localhost', # :port => 7331, # # # optional authentication token, if it doesn't match the one # # set on the server-side you'll be getting exceptions. # :token => 'superdupersecret', # # # optional serializer (defaults to YAML) - # # see the 'serializer' method at: - # # http://eventmachine.rubyforge.org/EventMachine/Protocols/ObjectProtocol.html#M000369 # :serializer => Marshal, # # # serializer to use if the first choice fails # :fallback_serializer => YAML, # @@ -191,109 +57,134 @@ # # SSL certificate # :ssl_cert => cwd + '/../spec/pems/client/cert.pem' # } # # @param [Hash] opts + # @option opts [String] :host Hostname/IP address. + # @option opts [Integer] :port Port number. + # @option opts [String] :socket Path to UNIX domain socket. + # @option opts [String] :token Optional authentication token. + # @option opts [.dump, .load] :serializer (YAML) + # Serializer to use for message transmission. + # @option opts [.dump, .load] :fallback_serializer + # Optional fallback serializer to be used when the primary one fails. + # @option opts [Integer] :max_retries + # How many times to retry failed requests. + # @option opts [String] :ssl_ca SSL CA certificate. + # @option opts [String] :ssl_pkey SSL private key. + # @option opts [String] :ssl_cert SSL certificate. # def initialize( opts ) - @opts = opts + @opts = opts if @opts[:ssl_pkey] && @opts[:ssl_cert] if !File.exist?( @opts[:ssl_pkey] ) - raise 'Could not find private key at: ' + @opts[:ssl_pkey] + raise "Could not find private key at: #{@opts[:ssl_pkey]}" end if !File.exist?( @opts[:ssl_cert] ) - raise 'Could not find certificate at: ' + @opts[:ssl_cert] + raise "Could not find certificate at: #{@opts[:ssl_cert]}" end end @token = @opts[:token] @logger = ::Logger.new( STDOUT ) @logger.level = Logger::INFO @host, @port = @opts[:host], @opts[:port] + @socket = @opts[:socket] + if !@socket && !(@host || @port) + fail ArgumentError, 'Needs either a :socket or :host and :port options.' + end + + @port = @port.to_i + clear_handlers end # - # This is a way to identify methods that pass their result to a block - # instead of simply returning them (which is the most usual operation of async methods. + # @example # - # So no need to change your coding conventions to fit the RPC stuff, - # you can just decide dynamically based on the plethora of data which Ruby provides - # by its 'Method' class. - # # server.add_async_check do |method| # # # # Must return 'true' for async and 'false' for sync. # # # # Very simple check here... # # # 'async' == method.name.to_s.split( '_' )[0] # end # - # @param [Proc] &block + # @param [Block] block + # Block to identify methods that pass their result to a block instead of + # simply returning them (which is the most usual operation of async methods). # def add_async_check( &block ) @async_checks << block end # - # Adds a handler by name: + # @example # # server.add_handler( 'myclass', MyClass.new ) # - # @param [String] name name via which to make the object available over RPC - # @param [Object] obj object instance + # @param [String] name + # Name by which to make the object available over RPC. + # @param [Object] obj Instantiated server object to expose. # def add_handler( name, obj ) - @objects[name] = obj - @methods[name] = Set.new # no lookup overhead please :) + @objects[name] = obj + @methods[name] = Set.new @async_methods[name] = Set.new obj.class.public_instance_methods( false ).each do |method| - @methods[name] << method.to_s + @methods[name] << method.to_s @async_methods[name] << method.to_s if async_check( obj.method( method ) ) end end + # Clears all handlers and their associated information like methods and + # async check blocks. # - # Clears all handlers and their associated information like methods - # and async check blocks. - # + # @see #add_handler + # @see #add_async_check def clear_handlers @objects = {} @methods = {} @async_checks = [] @async_methods = {} end - # - # Runs the server and blocks. - # + # Runs the server and blocks while EM ir running. def run Arachni::RPC::EM.schedule { start } Arachni::RPC::EM.block end - # # Starts the server but does not block. - # def start - @logger.info( 'System' ){ "RPC Server started." } - @logger.info( 'System' ){ "Listening on #{@host}:#{@port}" } + @logger.info( 'System' ){ 'RPC Server started.' } + @logger.info( 'System' ) do + interface = @socket ? @socket : "#{@host}:#{@port}" + "Listening on #{interface}" + end - ::EM.start_server( @host, @port, Proxy, self ) + opts = @socket ? @socket : [@host, @port] + ::EM.start_server( *[opts, Handler, self].flatten ) end + # @note If the called method is asynchronous it will be sent by this method + # directly, otherwise it will be handled by the {Handler}. + # + # @param [Handler] connection + # Connection with request information. + # + # @return [Arachni::RPC::Response] def call( connection ) - - req = connection.request + req = connection.request peer_ip_addr = connection.peer_ip_addr expr, args = req.message, req.args meth_name, obj_name = parse_expr( expr ) @@ -309,44 +200,44 @@ msg = "Trying to access non-public method '#{meth_name}'." @logger.error( 'Call' ){ msg + " [on behalf of #{peer_ip_addr}]" } raise InvalidMethod.new( msg ) end - # the proxy needs to know whether this is an async call because if it - # is we'll have already send the response. + # The handler needs to know if this is an async call because if it is + # we'll have already send the response and it doesn't need to do + # transmit anything. res = Response.new res.async! if async?( obj_name, meth_name ) - if !res.async? - res.obj = @objects[obj_name].send( meth_name.to_sym, *args ) - else + if res.async? @objects[obj_name].send( meth_name.to_sym, *args ) do |obj| res.obj = obj connection.send_response( res ) end + else + res.obj = @objects[obj_name].send( meth_name.to_sym, *args ) end res end - # # @return [TrueClass] - # def alive? true end - # # Shuts down the server after 2 seconds - # def shutdown wait_for = 2 @logger.info( 'System' ){ "Shutting down in #{wait_for} seconds..." } - # don't die before returning - ::EM.add_timer( wait_for ) { ::EM.stop } + # Don't die before returning... + ::EM.add_timer( wait_for ) do + File.unlink( @socket ) if @socket + ::EM.stop + end true end private @@ -356,10 +247,9 @@ def async_check( method ) @async_checks.each { |check| return true if check.call( method ) } false end - def log_call( peer_ip_addr, expr, *args ) msg = "#{expr}" # this should be in a @logger.debug call but it'll get out of sync