lib/ruby_skynet/connection.rb in ruby_skynet-0.6.0 vs lib/ruby_skynet/connection.rb in ruby_skynet-0.7.0
- old
+ new
@@ -10,10 +10,11 @@
# Handles connecting to Skynet Servers as a host:port pair
#
module RubySkynet
class Connection
include SyncAttr
+ include SemanticLogger::Loggable
# Returns the underlying socket being used by a Connection instance
attr_reader :socket
# Default Pool configuration
@@ -24,15 +25,10 @@
:timeout => 10, # Raise a Timeout exception if no connections are available after the :timeout seconds
:idle_timeout => 600, # Renew a connection if it has been idle for this period of time
}
end
- # Logging instance for the connection pool
- sync_cattr_reader :logger do
- SemanticLogger::Logger.new(self)
- end
-
# For each server there is a connection pool keyed on the
# server address: 'host:port'
@@connection_pools = ThreadSafe::Hash.new
# Returns a new RubySkynet connection to the server
@@ -53,11 +49,11 @@
#
# :connect_retry_interval [Float]
# Number of seconds between connection retry attempts after the first failed attempt
# Default: 0.5
def initialize(server, params = {})
- @logger = SemanticLogger::Logger.new("#{self.class.name}: #{server}")
+ self.logger = SemanticLogger["#{self.class.name} [#{server}]"]
# User configurable options
params[:read_timeout] ||= 60
params[:connect_timeout] ||= 30
params[:connect_retry_interval] ||= 0.1
@@ -69,34 +65,34 @@
# For each new connection perform the Skynet handshake
params[:on_connect] = Proc.new do |socket|
# Reset user_data on each connection
socket.user_data = {
:seq => 0,
- :logger => @logger
+ :logger => logger
}
# Receive Service Handshake
# Registered bool
# Registered indicates the state of this service. If it is false, the connection will
# close immediately and the client should look elsewhere for this service.
#
# ClientID string
# ClientID is a UUID that is used by the client to identify itself in RPC requests.
- @logger.debug "Waiting for Service Handshake"
+ logger.debug "Waiting for Service Handshake"
service_handshake = Common.read_bson_document(socket)
- @logger.trace 'Service Handshake', service_handshake
+ logger.trace 'Service Handshake', service_handshake
# #TODO When a reconnect returns registered == false need to throw an exception
# so that this host connection is not used
registered = service_handshake['registered']
client_id = service_handshake['clientid']
socket.user_data[:client_id] = client_id
# Send blank ClientHandshake
client_handshake = { 'clientid' => client_id }
- @logger.debug "Sending Client Handshake"
- @logger.trace 'Client Handshake', client_handshake
+ logger.debug "Sending Client Handshake"
+ logger.trace 'Client Handshake', client_handshake
socket.write(BSON.serialize(client_handshake).to_s)
end
# To prevent strange issues if user incorrectly supplies server names
params.delete(:servers)
@@ -121,95 +117,96 @@
# Returns the Hash result returned from the Skynet Service
#
# Raises RubySkynet::ProtocolError
# Raises RubySkynet::SkynetException
def rpc_call(request_id, skynet_name, method_name, parameters, idempotent=false)
- retry_count = 0
- header = nil
- response = nil
+ logger.benchmark_info "Called #{skynet_name}.#{method_name}" do
+ retry_count = 0
+ header = nil
+ response = nil
+ socket.retry_on_connection_failure do |socket|
+ header = {
+ 'servicemethod' => "#{skynet_name}.Forward",
+ 'seq' => socket.user_data[:seq]
+ }
- socket.retry_on_connection_failure do |socket|
- header = {
- 'servicemethod' => "#{skynet_name}.Forward",
- 'seq' => socket.user_data[:seq]
- }
+ logger.debug "Sending Header"
+ logger.trace 'Header', header
+ socket.write(BSON.serialize(header).to_s)
- @logger.debug "Sending Header"
- @logger.trace 'Header', header
- socket.write(BSON.serialize(header).to_s)
-
- # The parameters are placed in the request object in BSON serialized form
- request = {
- 'clientid' => socket.user_data[:client_id],
- 'in' => BSON::Binary.new(BSON.serialize(parameters).to_s),
- 'method' => method_name.to_s,
- 'requestinfo' => {
- 'requestid' => request_id,
- # Increment retry count to indicate that the request may have been tried previously
- 'retrycount' => retry_count,
- # TODO: this should be forwarded along in case of services also
- # being a client and calling additional services. If empty it will
- # be stuffed with connecting address
- 'originaddress' => ''
+ # The parameters are placed in the request object in BSON serialized form
+ request = {
+ 'clientid' => socket.user_data[:client_id],
+ 'in' => BSON::Binary.new(BSON.serialize(parameters).to_s),
+ 'method' => method_name.to_s,
+ 'requestinfo' => {
+ 'requestid' => request_id,
+ # Increment retry count to indicate that the request may have been tried previously
+ 'retrycount' => retry_count,
+ # TODO: this should be forwarded along in case of services also
+ # being a client and calling additional services. If empty it will
+ # be stuffed with connecting address
+ 'originaddress' => ''
+ }
}
- }
- @logger.debug "Sending Request"
- @logger.trace 'Request', request
- @logger.trace 'Parameters:', parameters
- socket.write(BSON.serialize(request).to_s)
+ logger.debug "Sending Request"
+ logger.trace 'Request', request
+ logger.trace 'Parameters:', parameters
+ socket.write(BSON.serialize(request).to_s)
- # Since Send does not affect state on the server we can also retry reads
- if idempotent
- @logger.debug "Reading header from server"
+ # Since Send does not affect state on the server we can also retry reads
+ if idempotent
+ logger.debug "Reading header from server"
+ header = Common.read_bson_document(socket)
+ logger.debug 'Response Header', header
+
+ # Read the BSON response document
+ logger.debug "Reading response from server"
+ response = Common.read_bson_document(socket)
+ logger.trace 'Response', response
+ end
+ end
+
+ # Perform the read outside the retry block since a successful write
+ # means that the servers state may have been changed
+ unless idempotent
+ # Read header first as a separate BSON document
+ logger.debug "Reading header from server"
header = Common.read_bson_document(socket)
- @logger.debug 'Response Header', header
+ logger.debug 'Response Header', header
# Read the BSON response document
- @logger.debug "Reading response from server"
+ logger.debug "Reading response from server"
response = Common.read_bson_document(socket)
- @logger.trace 'Response', response
+ logger.trace 'Response', response
end
- end
- # Perform the read outside the retry block since a successful write
- # means that the servers state may have been changed
- unless idempotent
- # Read header first as a separate BSON document
- @logger.debug "Reading header from server"
- header = Common.read_bson_document(socket)
- @logger.debug 'Response Header', header
+ # Ensure the sequence number in the response header matches the
+ # sequence number sent in the request
+ seq_no = header['seq']
+ if seq_no != socket.user_data[:seq]
+ raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}")
+ end
- # Read the BSON response document
- @logger.debug "Reading response from server"
- response = Common.read_bson_document(socket)
- @logger.trace 'Response', response
- end
+ # Increment Sequence number only on successful response
+ socket.user_data[:seq] += 1
- # Ensure the sequence number in the response header matches the
- # sequence number sent in the request
- seq_no = header['seq']
- if seq_no != socket.user_data[:seq]
- raise ProtocolError.new("Incorrect Response received, expected seq=#{socket.user_data[:seq]}, received: #{header.inspect}")
- end
+ # If an error is returned from Skynet raise a Skynet exception
+ error = header['error']
+ raise SkynetException.new(error) if error.to_s.length > 0
- # Increment Sequence number only on successful response
- socket.user_data[:seq] += 1
+ # If an error is returned from the service raise a Service exception
+ error = response['error']
+ raise ServiceException.new(error) if error.to_s.length > 0
- # If an error is returned from Skynet raise a Skynet exception
- error = header['error']
- raise SkynetException.new(error) if error.to_s.length > 0
-
- # If an error is returned from the service raise a Service exception
- error = response['error']
- raise ServiceException.new(error) if error.to_s.length > 0
-
- # Return Value
- # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized
- result = BSON.deserialize(response['out'])
- @logger.trace 'Return Value', result
- result
+ # Return Value
+ # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized
+ result = BSON.deserialize(response['out'])
+ logger.trace 'Return Value', result
+ result
+ end
end
# Execute the supplied block with a connection from the pool
def self.with_connection(server, params={}, &block)
(@@connection_pools[server] ||= new_connection_pool(server, params)).with_connection(&block)
@@ -225,14 +222,15 @@
# Returns a new connection pool for the specified server
def self.new_connection_pool(server, params={})
# Connection pool configuration options
config = pool_config.dup
+ logger = SemanticLogger::Logger.new("#{self.class.name} [#{server}]")
+
# Method to call to close idle connections
config[:close_proc] = :close
config[:logger] = logger
- config[:name] = "Connection pool for #{server}"
pool = GenePool.new(pool_config) do
new(server, params)
end
@@ -240,10 +238,10 @@
RubySkynet.services.on_server_removed(server) do
pool = @@connection_pools.delete(server)
# Cannot close all the connections since they could still be in use
pool.remove_idle(0) if pool
#pool.close if pool
- logger.debug "Connection pool for server:#{server} has been released"
+ logger.debug "Connection pool released"
end
pool
end