lib/ruby_skynet/connection.rb in ruby_skynet-2.0.0.rc1 vs lib/ruby_skynet/connection.rb in ruby_skynet-2.0.0

- old
+ new

@@ -1,7 +1,6 @@ require 'bson' -require 'gene_pool' require 'thread_safe' require 'resilient_socket' require 'sync_attr' # @@ -15,24 +14,10 @@ include SemanticLogger::Loggable # Returns the underlying socket being used by a Connection instance attr_reader :socket - # Default Pool configuration - sync_cattr_accessor :pool_config do - { - :pool_size => 30, # Maximum number of connections to any one server - :warn_timeout => 2, # Log a warning if no connections are available after the :warn_timeout seconds - :timeout => 10, # Raise a Timeout exception if no connections are available after the :timeout seconds - :idle_timeout => 600, # Renew a connection if it has been idle for this period of time - } - end - - # For each server there is a connection pool keyed on the - # server address: 'host:port' - @@connection_pools = ThreadSafe::Hash.new - # Returns a new RubySkynet connection to the server # # Parameters: # :read_timeout [Float] # Time in seconds to timeout on read @@ -48,12 +33,12 @@ # Default: 10 # # :connect_retry_interval [Float] # Number of seconds between connection retry attempts after the first failed attempt # Default: 0.5 - def initialize(server, params = {}) - self.logger = SemanticLogger["#{self.class.name} [#{server}]"] + def initialize(servers, params = {}) + params = params.dup # User configurable options params[:read_timeout] ||= 60 params[:connect_timeout] ||= 30 params[:connect_retry_interval] ||= 0.1 @@ -93,12 +78,12 @@ logger.trace 'Client Handshake', client_handshake socket.write(client_handshake.to_bson) end # To prevent strange issues if user incorrectly supplies server names - params.delete(:servers) - params[:server] = server + params.delete(:server) + params[:servers] = servers @socket = ResilientSocket::TCPClient.new(params) end # Performs a synchronous call to a Skynet server @@ -207,46 +192,21 @@ end end # Execute the supplied block with a connection from the pool def self.with_connection(server, params={}, &block) - (@@connection_pools[server] ||= new_connection_pool(server, params)).with_connection(&block) + conn = nil + begin + conn = new(server, params) + block.call(conn) + ensure + conn.close if conn + end end def close @socket.close if @socket end - ######################## - protected - - # Returns a new connection pool for the specified server - def self.new_connection_pool(server, params={}) - # Connection pool configuration options - config = pool_config.dup - - logger = SemanticLogger::Logger.new("#{self.class.name} [#{server}]") - - # Method to call to close idle connections - config[:close_proc] = :close - config[:logger] = logger - - pool = GenePool.new(pool_config) do - new(server, params) - end - - # Cleanup corresponding connection pool when a server terminates - RubySkynet.service_registry.on_server_removed(server) do - pool = @@connection_pools.delete(server) - # Cannot close all the connections since they could still be in use - pool.remove_idle(0) if pool - #pool.close if pool - logger.debug "Connection pool released" - end - - pool - end - end - end