lib/arachni/rpc/em/server.rb in arachni-rpc-em-0.1.3 vs lib/arachni/rpc/em/server.rb in arachni-rpc-em-0.2
- old
+ new
@@ -8,175 +8,41 @@
module Arachni
module RPC
module EM
+require_relative 'server/handler'
+
+# EventMachine-based RPC server.
#
-# EventMachine-based RPC server class.
-#
-# It's capable of:
-# - performing and handling a few thousands requests per second (depending on call size, network conditions and the like)
-# - TLS encryption
-# - asynchronous and synchronous requests
-# - handling asynchronous methods that require a block
-#
-# @author: Tasos "Zapotek" Laskos <tasos.laskos@gmail.com>
-#
+# @author Tasos "Zapotek" Laskos <tasos.laskos@gmail.com>
class Server
include ::Arachni::RPC::Exceptions
- #
- # Handles EventMachine's connection stuff.
- #
- # It's responsible for TLS, serializing, transmitting and receiving objects,
- # as well as authenticating the client using the token.
- #
- # It also handles and forwards exceptions.
- #
- # @author: Tasos "Zapotek" Laskos <tasos.laskos@gmail.com>
- #
- class Proxy < EventMachine::Connection
- include ::Arachni::RPC::EM::Protocol
- include ::Arachni::RPC::Exceptions
- include ::Arachni::RPC::EM::ConnectionUtilities
-
- INACTIVITY_TIMEOUT = 10
-
- attr_reader :request
-
- def initialize( server )
- super
- @server = server
- @opts = server.opts
-
- assume_server_role!
-
- @id = nil
- @request = nil
-
- # do not tolerate long periods of
- # inactivity in order to avoid zombie connections
- set_comm_inactivity_timeout( INACTIVITY_TIMEOUT )
- end
-
- # starts TLS
- def post_init
- start_ssl
- end
-
- def unbind
- end_ssl
- end
-
- def log( severity, progname, msg )
- sev_sym = Logger.const_get( severity.to_s.upcase.to_sym )
- @server.logger.add( sev_sym, msg, progname )
- end
-
- #
- # Handles requests and sends back the responses.
- #
- # @param [Arachni::RPC::EM::Request] req
- #
- def receive_request( req )
- @request = req
-
- # the method call may block a little so tell EventMachine to
- # stick it in its own thread.
- res = Response.new
- peer = peer_ip_addr
-
- begin
- # token-based authentication
- authenticate!
-
- # grab the result of the method call
- res.merge!( @server.call( self ) )
-
- # handle exceptions and convert them to a simple hash,
- # ready to be passed to the client.
- rescue Exception => e
-
- type = ''
-
- # if it's an RPC exception pass the type along as is
- if e.rpc_exception?
- type = e.class.name.split( ':' )[-1]
- # otherwise set it to a RemoteExeption
- else
- type = 'RemoteException'
- end
-
- res.obj = {
- 'exception' => e.to_s,
- 'backtrace' => e.backtrace,
- 'type' => type
- }
-
- msg = "#{e.to_s}\n#{e.backtrace.join( "\n" )}"
- @server.logger.error( 'Exception' ){ msg + " [on behalf of #{peer}]" }
- end
-
- #
- # pass the result of the RPC call back to the client
- # along with the callback ID but *only* if it wan't async
- # because server.call() will have already taken care of it
- #
- send_response( res ) if !res.async?
- end
-
- #
- # Authenticates the client based on the token in the request.
- #
- # It will raise an exception if the token doesn't check-out.
- #
- def authenticate!
- if !valid_token?( @request.token )
-
- msg = 'Token missing or invalid while calling: ' + @request.message
-
- @server.logger.error( 'Authenticator' ){
- msg + " [on behalf of #{peer_ip_addr}]"
- }
-
- fail InvalidToken.new( msg )
- end
- end
-
- #
- # Compares the authentication token in the param with the one of the server.
- #
- # @param [String] token
- #
- # @return [Bool]
- #
- def valid_token?( token )
- token == @server.token
- end
-
- end
-
+ # @return [String] Authentication token.
attr_reader :token
+
+ # @return [Hash] Configuration options.
attr_reader :opts
+
+ # @return [Logger]
attr_reader :logger
#
# Starts EventMachine and the RPC server.
#
- # opts example:
+ # @example Example options:
#
# {
# :host => 'localhost',
# :port => 7331,
#
# # optional authentication token, if it doesn't match the one
# # set on the server-side you'll be getting exceptions.
# :token => 'superdupersecret',
#
# # optional serializer (defaults to YAML)
- # # see the 'serializer' method at:
- # # http://eventmachine.rubyforge.org/EventMachine/Protocols/ObjectProtocol.html#M000369
# :serializer => Marshal,
#
# # serializer to use if the first choice fails
# :fallback_serializer => YAML,
#
@@ -191,109 +57,134 @@
# # SSL certificate
# :ssl_cert => cwd + '/../spec/pems/client/cert.pem'
# }
#
# @param [Hash] opts
+ # @option opts [String] :host Hostname/IP address.
+ # @option opts [Integer] :port Port number.
+ # @option opts [String] :socket Path to UNIX domain socket.
+ # @option opts [String] :token Optional authentication token.
+ # @option opts [.dump, .load] :serializer (YAML)
+ # Serializer to use for message transmission.
+ # @option opts [.dump, .load] :fallback_serializer
+ # Optional fallback serializer to be used when the primary one fails.
+ # @option opts [Integer] :max_retries
+ # How many times to retry failed requests.
+ # @option opts [String] :ssl_ca SSL CA certificate.
+ # @option opts [String] :ssl_pkey SSL private key.
+ # @option opts [String] :ssl_cert SSL certificate.
#
def initialize( opts )
- @opts = opts
+ @opts = opts
if @opts[:ssl_pkey] && @opts[:ssl_cert]
if !File.exist?( @opts[:ssl_pkey] )
- raise 'Could not find private key at: ' + @opts[:ssl_pkey]
+ raise "Could not find private key at: #{@opts[:ssl_pkey]}"
end
if !File.exist?( @opts[:ssl_cert] )
- raise 'Could not find certificate at: ' + @opts[:ssl_cert]
+ raise "Could not find certificate at: #{@opts[:ssl_cert]}"
end
end
@token = @opts[:token]
@logger = ::Logger.new( STDOUT )
@logger.level = Logger::INFO
@host, @port = @opts[:host], @opts[:port]
+ @socket = @opts[:socket]
+ if !@socket && !(@host || @port)
+ fail ArgumentError, 'Needs either a :socket or :host and :port options.'
+ end
+
+ @port = @port.to_i
+
clear_handlers
end
#
- # This is a way to identify methods that pass their result to a block
- # instead of simply returning them (which is the most usual operation of async methods.
+ # @example
#
- # So no need to change your coding conventions to fit the RPC stuff,
- # you can just decide dynamically based on the plethora of data which Ruby provides
- # by its 'Method' class.
- #
# server.add_async_check do |method|
# #
# # Must return 'true' for async and 'false' for sync.
# #
# # Very simple check here...
# #
# 'async' == method.name.to_s.split( '_' )[0]
# end
#
- # @param [Proc] &block
+ # @param [Block] block
+ # Block to identify methods that pass their result to a block instead of
+ # simply returning them (which is the most usual operation of async methods).
#
def add_async_check( &block )
@async_checks << block
end
#
- # Adds a handler by name:
+ # @example
#
# server.add_handler( 'myclass', MyClass.new )
#
- # @param [String] name name via which to make the object available over RPC
- # @param [Object] obj object instance
+ # @param [String] name
+ # Name by which to make the object available over RPC.
+ # @param [Object] obj Instantiated server object to expose.
#
def add_handler( name, obj )
- @objects[name] = obj
- @methods[name] = Set.new # no lookup overhead please :)
+ @objects[name] = obj
+ @methods[name] = Set.new
@async_methods[name] = Set.new
obj.class.public_instance_methods( false ).each do |method|
- @methods[name] << method.to_s
+ @methods[name] << method.to_s
@async_methods[name] << method.to_s if async_check( obj.method( method ) )
end
end
+ # Clears all handlers and their associated information like methods and
+ # async check blocks.
#
- # Clears all handlers and their associated information like methods
- # and async check blocks.
- #
+ # @see #add_handler
+ # @see #add_async_check
def clear_handlers
@objects = {}
@methods = {}
@async_checks = []
@async_methods = {}
end
- #
- # Runs the server and blocks.
- #
+ # Runs the server and blocks while EM ir running.
def run
Arachni::RPC::EM.schedule { start }
Arachni::RPC::EM.block
end
- #
# Starts the server but does not block.
- #
def start
- @logger.info( 'System' ){ "RPC Server started." }
- @logger.info( 'System' ){ "Listening on #{@host}:#{@port}" }
+ @logger.info( 'System' ){ 'RPC Server started.' }
+ @logger.info( 'System' ) do
+ interface = @socket ? @socket : "#{@host}:#{@port}"
+ "Listening on #{interface}"
+ end
- ::EM.start_server( @host, @port, Proxy, self )
+ opts = @socket ? @socket : [@host, @port]
+ ::EM.start_server( *[opts, Handler, self].flatten )
end
+ # @note If the called method is asynchronous it will be sent by this method
+ # directly, otherwise it will be handled by the {Handler}.
+ #
+ # @param [Handler] connection
+ # Connection with request information.
+ #
+ # @return [Arachni::RPC::Response]
def call( connection )
-
- req = connection.request
+ req = connection.request
peer_ip_addr = connection.peer_ip_addr
expr, args = req.message, req.args
meth_name, obj_name = parse_expr( expr )
@@ -309,44 +200,44 @@
msg = "Trying to access non-public method '#{meth_name}'."
@logger.error( 'Call' ){ msg + " [on behalf of #{peer_ip_addr}]" }
raise InvalidMethod.new( msg )
end
- # the proxy needs to know whether this is an async call because if it
- # is we'll have already send the response.
+ # The handler needs to know if this is an async call because if it is
+ # we'll have already send the response and it doesn't need to do
+ # transmit anything.
res = Response.new
res.async! if async?( obj_name, meth_name )
- if !res.async?
- res.obj = @objects[obj_name].send( meth_name.to_sym, *args )
- else
+ if res.async?
@objects[obj_name].send( meth_name.to_sym, *args ) do |obj|
res.obj = obj
connection.send_response( res )
end
+ else
+ res.obj = @objects[obj_name].send( meth_name.to_sym, *args )
end
res
end
- #
# @return [TrueClass]
- #
def alive?
true
end
- #
# Shuts down the server after 2 seconds
- #
def shutdown
wait_for = 2
@logger.info( 'System' ){ "Shutting down in #{wait_for} seconds..." }
- # don't die before returning
- ::EM.add_timer( wait_for ) { ::EM.stop }
+ # Don't die before returning...
+ ::EM.add_timer( wait_for ) do
+ File.unlink( @socket ) if @socket
+ ::EM.stop
+ end
true
end
private
@@ -356,10 +247,9 @@
def async_check( method )
@async_checks.each { |check| return true if check.call( method ) }
false
end
-
def log_call( peer_ip_addr, expr, *args )
msg = "#{expr}"
# this should be in a @logger.debug call but it'll get out of sync