lib/ruby_skynet/connection.rb in ruby_skynet-1.3.0.alpha3 vs lib/ruby_skynet/connection.rb in ruby_skynet-2.0.0.rc1

- old
+ new

@@ -1,6 +1,7 @@ require 'bson' +require 'gene_pool' require 'thread_safe' require 'resilient_socket' require 'sync_attr' # @@ -14,10 +15,24 @@ include SemanticLogger::Loggable # Returns the underlying socket being used by a Connection instance attr_reader :socket + # Default Pool configuration + sync_cattr_accessor :pool_config do + { + :pool_size => 30, # Maximum number of connections to any one server + :warn_timeout => 2, # Log a warning if no connections are available after the :warn_timeout seconds + :timeout => 10, # Raise a Timeout exception if no connections are available after the :timeout seconds + :idle_timeout => 600, # Renew a connection if it has been idle for this period of time + } + end + + # For each server there is a connection pool keyed on the + # server address: 'host:port' + @@connection_pools = ThreadSafe::Hash.new + # Returns a new RubySkynet connection to the server # # Parameters: # :read_timeout [Float] # Time in seconds to timeout on read @@ -33,12 +48,12 @@ # Default: 10 # # :connect_retry_interval [Float] # Number of seconds between connection retry attempts after the first failed attempt # Default: 0.5 - def initialize(servers, params = {}) - params = params.dup + def initialize(server, params = {}) + self.logger = SemanticLogger["#{self.class.name} [#{server}]"] # User configurable options params[:read_timeout] ||= 60 params[:connect_timeout] ||= 30 params[:connect_retry_interval] ||= 0.1 @@ -74,16 +89,16 @@ # Send blank ClientHandshake client_handshake = { 'clientid' => client_id } logger.debug "Sending Client Handshake" logger.trace 'Client Handshake', client_handshake - socket.write(BSON.serialize(client_handshake).to_s) + socket.write(client_handshake.to_bson) end # To prevent strange issues if user incorrectly supplies server names - params.delete(:server) - params[:servers] = servers + params.delete(:servers) + params[:server] = server @socket = ResilientSocket::TCPClient.new(params) end # Performs a synchronous call to a Skynet server @@ -114,16 +129,16 @@ 'seq' => socket.user_data[:seq] } logger.debug "Sending Header" logger.trace 'Header', header - socket.write(BSON.serialize(header).to_s) + socket.write(header.to_bson) # The parameters are placed in the request object in BSON serialized form request = { 'clientid' => socket.user_data[:client_id], - 'in' => BSON::Binary.new(BSON.serialize(parameters).to_s), + 'in' => BSON::Binary.new(parameters.to_bson), 'method' => method_name.to_s, 'requestinfo' => { 'requestid' => request_id, # Increment retry count to indicate that the request may have been tried previously 'retrycount' => retry_count, @@ -135,11 +150,11 @@ } logger.debug "Sending Request" logger.trace 'Request', request logger.trace 'Parameters:', parameters - socket.write(BSON.serialize(request).to_s) + socket.write(request.to_bson) # Since Send does not affect state on the server we can also retry reads if idempotent logger.debug "Reading header from server" header = Common.read_bson_document(socket) @@ -184,29 +199,54 @@ error = response['error'] raise ServiceException.new(error) if error.to_s.length > 0 # Return Value # The return value is inside the response object, it's a byte array of it's own and needs to be deserialized - result = BSON.deserialize(response['out']) + result = Hash.from_bson(StringIO.new(response['out'].data)) logger.trace 'Return Value', result result end end # Execute the supplied block with a connection from the pool def self.with_connection(server, params={}, &block) - conn = nil - begin - conn = new(server, params) - block.call(conn) - ensure - conn.close if conn - end + (@@connection_pools[server] ||= new_connection_pool(server, params)).with_connection(&block) end def close @socket.close if @socket end + ######################## + protected + + # Returns a new connection pool for the specified server + def self.new_connection_pool(server, params={}) + # Connection pool configuration options + config = pool_config.dup + + logger = SemanticLogger::Logger.new("#{self.class.name} [#{server}]") + + # Method to call to close idle connections + config[:close_proc] = :close + config[:logger] = logger + + pool = GenePool.new(pool_config) do + new(server, params) + end + + # Cleanup corresponding connection pool when a server terminates + RubySkynet.service_registry.on_server_removed(server) do + pool = @@connection_pools.delete(server) + # Cannot close all the connections since they could still be in use + pool.remove_idle(0) if pool + #pool.close if pool + logger.debug "Connection pool released" + end + + pool + end + end + end