lib/ruby_skynet/connection.rb in ruby_skynet-1.2.7 vs lib/ruby_skynet/connection.rb in ruby_skynet-1.3.0.alpha1
- old
+ new
@@ -1,7 +1,6 @@
require 'bson'
-require 'gene_pool'
require 'thread_safe'
require 'resilient_socket'
require 'sync_attr'
#
@@ -15,24 +14,10 @@
include SemanticLogger::Loggable
# Returns the underlying socket being used by a Connection instance
attr_reader :socket
- # Default Pool configuration
- sync_cattr_accessor :pool_config do
- {
- :pool_size => 30, # Maximum number of connections to any one server
- :warn_timeout => 2, # Log a warning if no connections are available after the :warn_timeout seconds
- :timeout => 10, # Raise a Timeout exception if no connections are available after the :timeout seconds
- :idle_timeout => 600, # Renew a connection if it has been idle for this period of time
- }
- end
-
- # For each server there is a connection pool keyed on the
- # server address: 'host:port'
- @@connection_pools = ThreadSafe::Hash.new
-
# Returns a new RubySkynet connection to the server
#
# Parameters:
# :read_timeout [Float]
# Time in seconds to timeout on read
@@ -48,12 +33,12 @@
# Default: 10
#
# :connect_retry_interval [Float]
# Number of seconds between connection retry attempts after the first failed attempt
# Default: 0.5
- def initialize(server, params = {})
- self.logger = SemanticLogger["#{self.class.name} [#{server}]"]
+ def initialize(servers, params = {})
+ params = params.dup
# User configurable options
params[:read_timeout] ||= 60
params[:connect_timeout] ||= 30
params[:connect_retry_interval] ||= 0.1
@@ -93,12 +78,12 @@
logger.trace 'Client Handshake', client_handshake
socket.write(BSON.serialize(client_handshake).to_s)
end
# To prevent strange issues if user incorrectly supplies server names
- params.delete(:servers)
- params[:server] = server
+ params.delete(:server)
+ params[:servers] = servers
@socket = ResilientSocket::TCPClient.new(params)
end
# Performs a synchronous call to a Skynet server
@@ -207,46 +192,21 @@
end
end
# Execute the supplied block with a connection from the pool
def self.with_connection(server, params={}, &block)
- (@@connection_pools[server] ||= new_connection_pool(server, params)).with_connection(&block)
+ conn = nil
+ begin
+ conn = new(server, params)
+ block.call(conn)
+ ensure
+ conn.close if conn
+ end
end
def close
@socket.close if @socket
end
- ########################
- protected
-
- # Returns a new connection pool for the specified server
- def self.new_connection_pool(server, params={})
- # Connection pool configuration options
- config = pool_config.dup
-
- logger = SemanticLogger::Logger.new("#{self.class.name} [#{server}]")
-
- # Method to call to close idle connections
- config[:close_proc] = :close
- config[:logger] = logger
-
- pool = GenePool.new(pool_config) do
- new(server, params)
- end
-
- # Cleanup corresponding connection pool when a server terminates
- RubySkynet.service_registry.on_server_removed(server) do
- pool = @@connection_pools.delete(server)
- # Cannot close all the connections since they could still be in use
- pool.remove_idle(0) if pool
- #pool.close if pool
- logger.debug "Connection pool released"
- end
-
- pool
- end
-
end
-
end