lib/ruby_skynet/connection.rb in ruby_skynet-0.6.0 vs lib/ruby_skynet/connection.rb in ruby_skynet-0.7.0

- old
+ new

@@ -10,10 +10,11 @@ # Handles connecting to Skynet Servers as a host:port pair # module RubySkynet class Connection include SyncAttr + include SemanticLogger::Loggable # Returns the underlying socket being used by a Connection instance attr_reader :socket # Default Pool configuration @@ -24,15 +25,10 @@ :timeout => 10, # Raise a Timeout exception if no connections are available after the :timeout seconds :idle_timeout => 600, # Renew a connection if it has been idle for this period of time } end - # Logging instance for the connection pool - sync_cattr_reader :logger do - SemanticLogger::Logger.new(self) - end - # For each server there is a connection pool keyed on the # server address: 'host:port' @@connection_pools = ThreadSafe::Hash.new # Returns a new RubySkynet connection to the server @@ -53,11 +49,11 @@ # # :connect_retry_interval [Float] # Number of seconds between connection retry attempts after the first failed attempt # Default: 0.5 def initialize(server, params = {}) - @logger = SemanticLogger::Logger.new("#{self.class.name}: #{server}") + self.logger = SemanticLogger["#{self.class.name} [#{server}]"] # User configurable options params[:read_timeout] ||= 60 params[:connect_timeout] ||= 30 params[:connect_retry_interval] ||= 0.1 @@ -69,34 +65,34 @@ # For each new connection perform the Skynet handshake params[:on_connect] = Proc.new do |socket| # Reset user_data on each connection socket.user_data = { :seq => 0, - :logger => @logger + :logger => logger } # Receive Service Handshake # Registered bool # Registered indicates the state of this service. If it is false, the connection will # close immediately and the client should look elsewhere for this service. # # ClientID string # ClientID is a UUID that is used by the client to identify itself in RPC requests. - @logger.debug "Waiting for Service Handshake" + logger.debug "Waiting for Service Handshake" service_handshake = Common.read_bson_document(socket) - @logger.trace 'Service Handshake', service_handshake + logger.trace 'Service Handshake', service_handshake # #TODO When a reconnect returns registered == false need to throw an exception # so that this host connection is not used registered = service_handshake['registered'] client_id = service_handshake['clientid'] socket.user_data[:client_id] = client_id # Send blank ClientHandshake client_handshake = { 'clientid' => client_id } - @logger.debug "Sending Client Handshake" - @logger.trace 'Client Handshake', client_handshake + logger.debug "Sending Client Handshake" + logger.trace 'Client Handshake', client_handshake socket.write(BSON.serialize(client_handshake).to_s) end # To prevent strange issues if user incorrectly supplies server names params.delete(:servers) @@ -121,95 +117,96 @@ # Returns the Hash result returned from the Skynet Service # # Raises RubySkynet::ProtocolError # Raises RubySkynet::SkynetException def rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false) - retry_count = 0 - header = nil - response = nil + logger.benchmark_info "Called #{skynet_name}.#{method_name}" do + retry_count = 0 + header = nil + response = nil + socket.retry_on_connection_failure do |socket| + header = { + 'servicemethod' => "#{skynet_name}.Forward", + 'seq' => socket.user_data[:seq] + } - socket.retry_on_connection_failure do |socket| - header = { - 'servicemethod' => "#{skynet_name}.Forward", - 'seq' => socket.user_data[:seq] - } + logger.debug "Sending Header" + logger.trace 'Header', header + socket.write(BSON.serialize(header).to_s) - @logger.debug "Sending Header" - @logger.trace 'Header', header - socket.write(BSON.serialize(header).to_s) - - # The parameters are placed in the request object in BSON serialized form - request = { - 'clientid' => socket.user_data[:client_id], - 'in' => BSON::Binary.new(BSON.serialize(parameters).to_s), - 'method' => method_name.to_s, - 'requestinfo' => { - 'requestid' => request_id, - # Increment retry count to indicate that the request may have been tried previously - 'retrycount' => retry_count, - # TODO: this should be forwarded along in case of services also - # being a client and calling additional services. If empty it will - # be stuffed with connecting address - 'originaddress' => '' + # The parameters are placed in the request object in BSON serialized form + request = { + 'clientid' => socket.user_data[:client_id], + 'in' => BSON::Binary.new(BSON.serialize(parameters).to_s), + 'method' => method_name.to_s, + 'requestinfo' => { + 'requestid' => request_id, + # Increment retry count to indicate that the request may have been tried previously + 'retrycount' => retry_count, + # TODO: this should be forwarded along in case of services also + # being a client and calling additional services. If empty it will + # be stuffed with connecting address + 'originaddress' => '' + } } - } - @logger.debug "Sending Request" - @logger.trace 'Request', request - @logger.trace 'Parameters:', parameters - socket.write(BSON.serialize(request).to_s) + logger.debug "Sending Request" + logger.trace 'Request', request + logger.trace 'Parameters:', parameters + socket.write(BSON.serialize(request).to_s) - # Since Send does not affect state on the server we can also retry reads - if idempotent - @logger.debug "Reading header from server" + # Since Send does not affect state on the server we can also retry reads + if idempotent + logger.debug "Reading header from server" + header = Common.read_bson_document(socket) + logger.debug 'Response Header', header + + # Read the BSON response document + logger.debug "Reading response from server" + response = Common.read_bson_document(socket) + logger.trace 'Response', response + end + end + + # Perform the read outside the retry block since a successful write + # means that the servers state may have been changed + unless idempotent + # Read header first as a separate BSON document + logger.debug "Reading header from server" header = Common.read_bson_document(socket) - @logger.debug 'Response Header', header + logger.debug 'Response Header', header # Read the BSON response document - @logger.debug "Reading response from server" + logger.debug "Reading response from server" response = Common.read_bson_document(socket) - @logger.trace 'Response', response + logger.trace 'Response', response end - end - # Perform the read outside the retry block since a successful write - # means that the servers state may have been changed - unless idempotent - # Read header first as a separate BSON document - @logger.debug "Reading header from server" - header = Common.read_bson_document(socket) - @logger.debug 'Response Header', header + # Ensure the sequence number in the response header matches the + # sequence number sent in the request + seq_no = header['seq'] + if seq_no != socket.user_data[:seq] + raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}") + end - # Read the BSON response document - @logger.debug "Reading response from server" - response = Common.read_bson_document(socket) - @logger.trace 'Response', response - end + # Increment Sequence number only on successful response + socket.user_data[:seq] += 1 - # Ensure the sequence number in the response header matches the - # sequence number sent in the request - seq_no = header['seq'] - if seq_no != socket.user_data[:seq] - raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}") - end + # If an error is returned from Skynet raise a Skynet exception + error = header['error'] + raise SkynetException.new(error) if error.to_s.length > 0 - # Increment Sequence number only on successful response - socket.user_data[:seq] += 1 + # If an error is returned from the service raise a Service exception + error = response['error'] + raise ServiceException.new(error) if error.to_s.length > 0 - # If an error is returned from Skynet raise a Skynet exception - error = header['error'] - raise SkynetException.new(error) if error.to_s.length > 0 - - # If an error is returned from the service raise a Service exception - error = response['error'] - raise ServiceException.new(error) if error.to_s.length > 0 - - # Return Value - # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized - result = BSON.deserialize(response['out']) - @logger.trace 'Return Value', result - result + # Return Value + # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized + result = BSON.deserialize(response['out']) + logger.trace 'Return Value', result + result + end end # Execute the supplied block with a connection from the pool def self.with_connection(server, params={}, &block) (@@connection_pools[server] ||= new_connection_pool(server, params)).with_connection(&block) @@ -225,14 +222,15 @@ # Returns a new connection pool for the specified server def self.new_connection_pool(server, params={}) # Connection pool configuration options config = pool_config.dup + logger = SemanticLogger::Logger.new("#{self.class.name} [#{server}]") + # Method to call to close idle connections config[:close_proc] = :close config[:logger] = logger - config[:name] = "Connection pool for #{server}" pool = GenePool.new(pool_config) do new(server, params) end @@ -240,10 +238,10 @@ RubySkynet.services.on_server_removed(server) do pool = @@connection_pools.delete(server) # Cannot close all the connections since they could still be in use pool.remove_idle(0) if pool #pool.close if pool - logger.debug "Connection pool for server:#{server} has been released" + logger.debug "Connection pool released" end pool end