lib/z_k/pool.rb in zk-0.6.5 vs lib/z_k/pool.rb in zk-0.7.1

- old
+ new

@@ -4,13 +4,19 @@ attr_reader :connections #:nodoc: def initialize @state = :init - @connections = [] - @connections.extend(MonitorMixin) - @checkin_cond = @connections.new_cond + @mutex = Monitor.new + @checkin_cond = @mutex.new_cond + + @connections = [] # all connections we control + @pool = [] # currently available connections + + # this is required for 1.8.7 compatibility + @on_connected_subs = {} + @on_connected_subs.extend(MonitorMixin) end # has close_all! been called on this ConnectionPool ? def closed? @state == :closed @@ -32,11 +38,11 @@ end # close all the connections on the pool # @param optional Boolean graceful allow the checked out connections to come back first? def close_all! - synchronize do + @mutex.synchronize do return unless open? @state = :closing @checkin_cond.wait_until { (@pool.size == @connections.length) or closed? } @@ -45,11 +51,11 @@ end # calls close! on all connection objects, whether or not they're back in the pool # this is DANGEROUS! def force_close! #:nodoc: - synchronize do + @mutex.synchronize do return if (closed? or forced?) @state = :forced @pool.clear @@ -97,20 +103,20 @@ connection.__send__(meth, *args, &block) end end def size #:nodoc: - @pool.size + @connection.synchronize { @pool.size } end def pool_state #:nodoc: @state end protected def synchronize - @connections.synchronize { yield } + @mutex.synchronize { yield } end def assert_open! raise Exceptions::PoolIsShuttingDownException unless open? end @@ -138,57 +144,63 @@ @min_clients = Integer(opts.delete(:min_clients)) @max_clients = Integer(opts.delete(:max_clients)) @connection_timeout = opts.delete(:timeout) - # for compatibility w/ ClientPool we'll use @connections for synchronization - @pool = [] # currently available connections + @count_waiters = 0 - synchronize do + @mutex.synchronize do populate_pool!(@min_clients) @state = :open end end # returns the current number of allocated clients in the pool (not # available clients) def size - @connections.length + @mutex.synchronize { @connections.length } end # clients available for checkout (at time of call) def available_size - @pool.length + @mutex.synchronize { @pool.length } end def checkin(connection) - synchronize do - return if @pool.include?(connection) + @mutex.synchronize do + if @pool.include?(connection) + logger.debug { "Pool already contains connection: #{connection.object_id}, @connections.include? #{@connections.include?(connection).inspect}" } + return + end - @pool.unshift(connection) + @pool << connection + @checkin_cond.signal end end # number of threads waiting for connections def count_waiters #:nodoc: - @checkin_cond.count_waiters + @mutex.synchronize { @count_waiters } end def checkout(blocking=true) raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given? - synchronize do + @mutex.synchronize do while true assert_open! if @pool.length > 0 cnx = @pool.shift - # if the cnx isn't connected? then remove it from the pool and go - # through the loop again. when the cnx's on_connected event fires, it - # will add the connection back into the pool - next unless cnx.connected? + # if the connection isn't connected, then set up an on_connection + # handler and try the next one in the pool + unless cnx.connected? + logger.debug { "connection #{cnx.object_id} is not connected" } + handle_checkin_on_connection(cnx) + next + end # otherwise we return the cnx return cnx elsif can_grow_pool? add_connection! @@ -197,29 +209,60 @@ @checkin_cond.wait_while { @pool.empty? and open? } next else return false end - end + end # while end end + # @private + def can_grow_pool? + @mutex.synchronize { @connections.size < @max_clients } + end + protected def populate_pool!(num_cnx) num_cnx.times { add_connection! } end def add_connection! - synchronize do + @mutex.synchronize do cnx = create_connection @connections << cnx - cnx.on_connected { checkin(cnx) } - end + handle_checkin_on_connection(cnx) + end # synchronize end - def can_grow_pool? - synchronize { @connections.size < @max_clients } + def handle_checkin_on_connection(cnx) + @mutex.synchronize do + do_checkin = lambda do + checkin(cnx) + end + + if cnx.connected? + do_checkin.call + return + else + @on_connected_subs.synchronize do + + sub = cnx.on_connected do + # this synchronization is to prevent a race between setting up the subscription + # and assigning it to the @on_connected_subs hash. It's possible that the callback + # would fire before we had a chance to add the sub to the hash. + @on_connected_subs.synchronize do + if sub = @on_connected_subs.delete(cnx) + sub.unsubscribe + do_checkin.call + end + end + end + + @on_connected_subs[cnx] = sub + end + end + end end def create_connection ZK.new(@host, @connection_timeout, @connection_args) end