lib/ruby_skynet/connection.rb in ruby_skynet-1.3.0.alpha3 vs lib/ruby_skynet/connection.rb in ruby_skynet-2.0.0.rc1
- old
+ new
@@ -1,6 +1,7 @@
require 'bson'
+require 'gene_pool'
require 'thread_safe'
require 'resilient_socket'
require 'sync_attr'
#
@@ -14,10 +15,24 @@
include SemanticLogger::Loggable
# Returns the underlying socket being used by a Connection instance
attr_reader :socket
+ # Default Pool configuration
+ sync_cattr_accessor :pool_config do
+ {
+ :pool_size => 30, # Maximum number of connections to any one server
+ :warn_timeout => 2, # Log a warning if no connections are available after the :warn_timeout seconds
+ :timeout => 10, # Raise a Timeout exception if no connections are available after the :timeout seconds
+ :idle_timeout => 600, # Renew a connection if it has been idle for this period of time
+ }
+ end
+
+ # For each server there is a connection pool keyed on the
+ # server address: 'host:port'
+ @@connection_pools = ThreadSafe::Hash.new
+
# Returns a new RubySkynet connection to the server
#
# Parameters:
# :read_timeout [Float]
# Time in seconds to timeout on read
@@ -33,12 +48,12 @@
# Default: 10
#
# :connect_retry_interval [Float]
# Number of seconds between connection retry attempts after the first failed attempt
# Default: 0.5
- def initialize(servers, params = {})
- params = params.dup
+ def initialize(server, params = {})
+ self.logger = SemanticLogger["#{self.class.name} [#{server}]"]
# User configurable options
params[:read_timeout] ||= 60
params[:connect_timeout] ||= 30
params[:connect_retry_interval] ||= 0.1
@@ -74,16 +89,16 @@
# Send blank ClientHandshake
client_handshake = { 'clientid' => client_id }
logger.debug "Sending Client Handshake"
logger.trace 'Client Handshake', client_handshake
- socket.write(BSON.serialize(client_handshake).to_s)
+ socket.write(client_handshake.to_bson)
end
# To prevent strange issues if user incorrectly supplies server names
- params.delete(:server)
- params[:servers] = servers
+ params.delete(:servers)
+ params[:server] = server
@socket = ResilientSocket::TCPClient.new(params)
end
# Performs a synchronous call to a Skynet server
@@ -114,16 +129,16 @@
'seq' => socket.user_data[:seq]
}
logger.debug "Sending Header"
logger.trace 'Header', header
- socket.write(BSON.serialize(header).to_s)
+ socket.write(header.to_bson)
# The parameters are placed in the request object in BSON serialized form
request = {
'clientid' => socket.user_data[:client_id],
- 'in' => BSON::Binary.new(BSON.serialize(parameters).to_s),
+ 'in' => BSON::Binary.new(parameters.to_bson),
'method' => method_name.to_s,
'requestinfo' => {
'requestid' => request_id,
# Increment retry count to indicate that the request may have been tried previously
'retrycount' => retry_count,
@@ -135,11 +150,11 @@
}
logger.debug "Sending Request"
logger.trace 'Request', request
logger.trace 'Parameters:', parameters
- socket.write(BSON.serialize(request).to_s)
+ socket.write(request.to_bson)
# Since Send does not affect state on the server we can also retry reads
if idempotent
logger.debug "Reading header from server"
header = Common.read_bson_document(socket)
@@ -184,29 +199,54 @@
error = response['error']
raise ServiceException.new(error) if error.to_s.length > 0
# Return Value
# The return value is inside the response object, it's a byte array of it's own and needs to be deserialized
- result = BSON.deserialize(response['out'])
+ result = Hash.from_bson(StringIO.new(response['out'].data))
logger.trace 'Return Value', result
result
end
end
# Execute the supplied block with a connection from the pool
def self.with_connection(server, params={}, &block)
- conn = nil
- begin
- conn = new(server, params)
- block.call(conn)
- ensure
- conn.close if conn
- end
+ (@@connection_pools[server] ||= new_connection_pool(server, params)).with_connection(&block)
end
def close
@socket.close if @socket
end
+ ########################
+ protected
+
+ # Returns a new connection pool for the specified server
+ def self.new_connection_pool(server, params={})
+ # Connection pool configuration options
+ config = pool_config.dup
+
+ logger = SemanticLogger::Logger.new("#{self.class.name} [#{server}]")
+
+ # Method to call to close idle connections
+ config[:close_proc] = :close
+ config[:logger] = logger
+
+ pool = GenePool.new(pool_config) do
+ new(server, params)
+ end
+
+ # Cleanup corresponding connection pool when a server terminates
+ RubySkynet.service_registry.on_server_removed(server) do
+ pool = @@connection_pools.delete(server)
+ # Cannot close all the connections since they could still be in use
+ pool.remove_idle(0) if pool
+ #pool.close if pool
+ logger.debug "Connection pool released"
+ end
+
+ pool
+ end
+
end
+
end