module ZK module Pool class Base attr_reader :connections #:nodoc: def initialize @state = :init @connections = [] @connections.extend(MonitorMixin) @checkin_cond = @connections.new_cond end # has close_all! been called on this ConnectionPool ? def closed? @state == :closed end # is the pool shutting down? def closing? @state == :closing end # is the pool initialized and in normal operation? def open? @state == :open end # has the pool entered the take-no-prisoners connection closing part of shutdown? def forced? @state == :forced end # close all the connections on the pool # @param optional Boolean graceful allow the checked out connections to come back first? def close_all! synchronize do return unless open? @state = :closing @checkin_cond.wait_until { (@pool.size == @connections.length) or closed? } force_close! end end # calls close! on all connection objects, whether or not they're back in the pool # this is DANGEROUS! def force_close! #:nodoc: synchronize do return if (closed? or forced?) @state = :forced @pool.clear while cnx = @connections.shift cnx.close! end @state = :closed # free any waiting @checkin_cond.broadcast end end # yields next available connection to the block # # raises PoolIsShuttingDownException immediately if close_all! has been # called on this pool def with_connection assert_open! cnx = checkout(true) yield cnx ensure checkin(cnx) end #lock lives on past the connection checkout def locker(path) with_connection do |connection| connection.locker(path) end end #prefer this method if you can (keeps connection checked out) def with_lock(name, opts={}, &block) with_connection do |connection| connection.with_lock(name, opts, &block) end end # handle all def method_missing(meth, *args, &block) with_connection do |connection| connection.__send__(meth, *args, &block) end end def size #:nodoc: @pool.size end def pool_state #:nodoc: @state end protected def synchronize @connections.synchronize { yield } end def assert_open! raise Exceptions::PoolIsShuttingDownException unless open? end end # Base # like a Simple pool but has high/low watermarks, and can grow dynamically as needed class Bounded < Base DEFAULT_OPTIONS = { :timeout => 10, :min_clients => 1, :max_clients => 10, }.freeze # opts: # * :timeout: connection establishement timeout # * :min_clients: how many clients should be start out with # * :max_clients: the maximum number of clients we will create in response to demand def initialize(host, opts={}) super() @host = host @connection_args = opts opts = DEFAULT_OPTIONS.merge(opts) @min_clients = Integer(opts.delete(:min_clients)) @max_clients = Integer(opts.delete(:max_clients)) @connection_timeout = opts.delete(:timeout) # for compatibility w/ ClientPool we'll use @connections for synchronization @pool = [] # currently available connections synchronize do populate_pool!(@min_clients) @state = :open end end # returns the current number of allocated clients in the pool (not # available clients) def size @connections.length end # clients available for checkout (at time of call) def available_size @pool.length end def checkin(connection) synchronize do return if @pool.include?(connection) @pool.unshift(connection) @checkin_cond.signal end end # number of threads waiting for connections def count_waiters #:nodoc: @checkin_cond.count_waiters end def checkout(blocking=true) raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given? synchronize do while true assert_open! if @pool.length > 0 cnx = @pool.shift # if the cnx isn't connected? then remove it from the pool and go # through the loop again. when the cnx's on_connected event fires, it # will add the connection back into the pool next unless cnx.connected? # otherwise we return the cnx return cnx elsif can_grow_pool? add_connection! next elsif blocking @checkin_cond.wait_while { @pool.empty? and open? } next else return false end end end end protected def populate_pool!(num_cnx) num_cnx.times { add_connection! } end def add_connection! synchronize do cnx = create_connection @connections << cnx cnx.on_connected { checkin(cnx) } end end def can_grow_pool? synchronize { @connections.size < @max_clients } end def create_connection ZK.new(@host, @connection_timeout, @connection_args) end end # Bounded # create a connection pool useful for multithreaded applications # # Will spin up +number_of_connections+ at creation time and remain fixed at # that number for the life of the pool. # # ==== Example # pool = ZK::Pool::Simple.new("localhost:2181", 10) # pool.checkout do |zk| # zk.create("/mynew_path") # end class Simple < Bounded # initialize a connection pool using the same optons as ZK.new # @param String host the same arguments as ZK.new # @param Integer number_of_connections the number of connections to put in the pool # @param optional Hash opts Options to pass on to each connection # @return ZK::ClientPool def initialize(host, number_of_connections=10, opts = {}) opts = opts.dup opts[:max_clients] = opts[:min_clients] = number_of_connections.to_i super(host, opts) end end # Simple end # Pool end # ZK