lib/z_k/pool.rb in zk-0.6.5 vs lib/z_k/pool.rb in zk-0.7.1
- old
+ new
@@ -4,13 +4,19 @@
attr_reader :connections #:nodoc:
def initialize
@state = :init
- @connections = []
- @connections.extend(MonitorMixin)
- @checkin_cond = @connections.new_cond
+ @mutex = Monitor.new
+ @checkin_cond = @mutex.new_cond
+
+ @connections = [] # all connections we control
+ @pool = [] # currently available connections
+
+ # this is required for 1.8.7 compatibility
+ @on_connected_subs = {}
+ @on_connected_subs.extend(MonitorMixin)
end
# has close_all! been called on this ConnectionPool ?
def closed?
@state == :closed
@@ -32,11 +38,11 @@
end
# close all the connections on the pool
# @param optional Boolean graceful allow the checked out connections to come back first?
def close_all!
- synchronize do
+ @mutex.synchronize do
return unless open?
@state = :closing
@checkin_cond.wait_until { (@pool.size == @connections.length) or closed? }
@@ -45,11 +51,11 @@
end
# calls close! on all connection objects, whether or not they're back in the pool
# this is DANGEROUS!
def force_close! #:nodoc:
- synchronize do
+ @mutex.synchronize do
return if (closed? or forced?)
@state = :forced
@pool.clear
@@ -97,20 +103,20 @@
connection.__send__(meth, *args, &block)
end
end
def size #:nodoc:
- @pool.size
+ @connection.synchronize { @pool.size }
end
def pool_state #:nodoc:
@state
end
protected
def synchronize
- @connections.synchronize { yield }
+ @mutex.synchronize { yield }
end
def assert_open!
raise Exceptions::PoolIsShuttingDownException unless open?
end
@@ -138,57 +144,63 @@
@min_clients = Integer(opts.delete(:min_clients))
@max_clients = Integer(opts.delete(:max_clients))
@connection_timeout = opts.delete(:timeout)
- # for compatibility w/ ClientPool we'll use @connections for synchronization
- @pool = [] # currently available connections
+ @count_waiters = 0
- synchronize do
+ @mutex.synchronize do
populate_pool!(@min_clients)
@state = :open
end
end
# returns the current number of allocated clients in the pool (not
# available clients)
def size
- @connections.length
+ @mutex.synchronize { @connections.length }
end
# clients available for checkout (at time of call)
def available_size
- @pool.length
+ @mutex.synchronize { @pool.length }
end
def checkin(connection)
- synchronize do
- return if @pool.include?(connection)
+ @mutex.synchronize do
+ if @pool.include?(connection)
+ logger.debug { "Pool already contains connection: #{connection.object_id}, @connections.include? #{@connections.include?(connection).inspect}" }
+ return
+ end
- @pool.unshift(connection)
+ @pool << connection
+
@checkin_cond.signal
end
end
# number of threads waiting for connections
def count_waiters #:nodoc:
- @checkin_cond.count_waiters
+ @mutex.synchronize { @count_waiters }
end
def checkout(blocking=true)
raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given?
- synchronize do
+ @mutex.synchronize do
while true
assert_open!
if @pool.length > 0
cnx = @pool.shift
- # if the cnx isn't connected? then remove it from the pool and go
- # through the loop again. when the cnx's on_connected event fires, it
- # will add the connection back into the pool
- next unless cnx.connected?
+ # if the connection isn't connected, then set up an on_connection
+ # handler and try the next one in the pool
+ unless cnx.connected?
+ logger.debug { "connection #{cnx.object_id} is not connected" }
+ handle_checkin_on_connection(cnx)
+ next
+ end
# otherwise we return the cnx
return cnx
elsif can_grow_pool?
add_connection!
@@ -197,29 +209,60 @@
@checkin_cond.wait_while { @pool.empty? and open? }
next
else
return false
end
- end
+ end # while
end
end
+ # @private
+ def can_grow_pool?
+ @mutex.synchronize { @connections.size < @max_clients }
+ end
+
protected
def populate_pool!(num_cnx)
num_cnx.times { add_connection! }
end
def add_connection!
- synchronize do
+ @mutex.synchronize do
cnx = create_connection
@connections << cnx
- cnx.on_connected { checkin(cnx) }
- end
+ handle_checkin_on_connection(cnx)
+ end # synchronize
end
- def can_grow_pool?
- synchronize { @connections.size < @max_clients }
+ def handle_checkin_on_connection(cnx)
+ @mutex.synchronize do
+ do_checkin = lambda do
+ checkin(cnx)
+ end
+
+ if cnx.connected?
+ do_checkin.call
+ return
+ else
+ @on_connected_subs.synchronize do
+
+ sub = cnx.on_connected do
+ # this synchronization is to prevent a race between setting up the subscription
+ # and assigning it to the @on_connected_subs hash. It's possible that the callback
+ # would fire before we had a chance to add the sub to the hash.
+ @on_connected_subs.synchronize do
+ if sub = @on_connected_subs.delete(cnx)
+ sub.unsubscribe
+ do_checkin.call
+ end
+ end
+ end
+
+ @on_connected_subs[cnx] = sub
+ end
+ end
+ end
end
def create_connection
ZK.new(@host, @connection_timeout, @connection_args)
end