module ZK module Pool # Base class for a ZK connection pool. There are some applications that may # require high synchronous throughput, which would be a suitable use for a # connection pool. The ZK::Client::Threaded class is threadsafe, so it's # not a problem accessing it from multiple threads, but it is limited to # one outgoing synchronous request at a time, which could cause throughput # issues for apps that are making very heavy use of zookeeper. # # The problem with using a connection pool is the added complexity when you # try to use watchers. It may be possible to register a watch with one # connection, and then call `:watch => true` on a different connection if # you're not careful. Events delivered as part of an event handler have a # `zk` attribute which can be used to access the connection that the # callback is registered with. # # Unless you're sure you *need* a connection pool, then avoid the added # complexity. # class Base include Logging attr_reader :connections #:nodoc: def initialize @state = :init @mutex = Monitor.new @checkin_cond = @mutex.new_cond @connections = [] # all connections we control @pool = [] # currently available connections # this is required for 1.8.7 compatibility @on_connected_subs = {} @on_connected_subs.extend(MonitorMixin) end # has close_all! been called on this ConnectionPool ? def closed? @state == :closed end # is the pool shutting down? def closing? @state == :closing end # is the pool initialized and in normal operation? def open? @state == :open end # has the pool entered the take-no-prisoners connection closing part of shutdown? def forced? @state == :forced end # close all the connections on the pool def close_all! @mutex.synchronize do return unless open? @state = :closing @checkin_cond.wait_until { (@pool.size == @connections.length) or closed? } force_close! end end # calls close! on all connection objects, whether or not they're back in the pool # this is DANGEROUS! # @private def force_close! @mutex.synchronize do return if (closed? or forced?) @state = :forced @pool.clear until @connections.empty? if cnx = @connections.shift cnx.close! end end @state = :closed # free any waiting @checkin_cond.broadcast end end # @private def wait_until_closed @mutex.synchronize do @checkin_cond.wait_until { @state == :closed } end end # yields next available connection to the block # # raises PoolIsShuttingDownException immediately if close_all! has been # called on this pool def with_connection assert_open! cnx = checkout(true) yield cnx ensure checkin(cnx) end #lock lives on past the connection checkout def locker(path) with_connection do |connection| connection.locker(path) end end #prefer this method if you can (keeps connection checked out) def with_lock(name, opts={}, &block) with_connection do |connection| connection.with_lock(name, opts, &block) end end # handle all def method_missing(meth, *args, &block) with_connection do |connection| connection.__send__(meth, *args, &block) end end def size #:nodoc: @mutex.synchronize { @pool.size } end def pool_state #:nodoc: @state end protected def synchronize @mutex.synchronize { yield } end def assert_open! raise Exceptions::PoolIsShuttingDownException, "pool is shutting down" unless open? end end # Base # like a Simple pool but has high/low watermarks, and can grow dynamically as needed class Bounded < Base DEFAULT_OPTIONS = { :timeout => 10, :min_clients => 1, :max_clients => 10, }.freeze # opts: # * :timeout: connection establishement timeout # * :min_clients: how many clients should be start out with # * :max_clients: the maximum number of clients we will create in response to demand def initialize(host, opts={}) super() @host = host @connection_args = opts opts = DEFAULT_OPTIONS.merge(opts) @min_clients = Integer(opts.delete(:min_clients)) @max_clients = Integer(opts.delete(:max_clients)) @connection_timeout = opts.delete(:timeout) @count_waiters = 0 @mutex.synchronize do populate_pool!(@min_clients) @state = :open end end # returns the current number of allocated clients in the pool (not # available clients) def size @mutex.synchronize { @connections.length } end # clients available for checkout (at time of call) def available_size @mutex.synchronize { @pool.length } end def checkin(connection) @mutex.synchronize do if @pool.include?(connection) logger.debug { "Pool already contains connection: #{connection.object_id}, @connections.include? #{@connections.include?(connection).inspect}" } return end @pool << connection @checkin_cond.broadcast end end # number of threads waiting for connections def count_waiters #:nodoc: @mutex.synchronize { @count_waiters } end def checkout(blocking=true) raise ArgumentError, "checkout does not take a block, use .with_connection" if block_given? @mutex.lock begin while true assert_open! if @pool.length > 0 cnx = @pool.shift # XXX(slyphon): not really sure how this case happens, but protect against it as we're # seeing an issue in production next if cnx.nil? # if the connection isn't connected, then set up an on_connection # handler and try the next one in the pool unless cnx.connected? logger.debug { "connection #{cnx.object_id} is not connected" } handle_checkin_on_connection(cnx) next end # otherwise we return the cnx return cnx elsif can_grow_pool? add_connection! next elsif blocking @checkin_cond.wait_while { @pool.empty? and open? } next else return false end end # while ensure @mutex.unlock rescue nil end end # @private def can_grow_pool? @mutex.synchronize { @connections.size < @max_clients } end protected def populate_pool!(num_cnx) num_cnx.times { add_connection! } end def add_connection! @mutex.synchronize do cnx = create_connection @connections << cnx handle_checkin_on_connection(cnx) end # synchronize end def handle_checkin_on_connection(cnx) @mutex.synchronize do do_checkin = lambda do checkin(cnx) end if cnx.connected? do_checkin.call return else @on_connected_subs.synchronize do sub = cnx.on_connected do # this synchronization is to prevent a race between setting up the subscription # and assigning it to the @on_connected_subs hash. It's possible that the callback # would fire before we had a chance to add the sub to the hash. @on_connected_subs.synchronize do if sub = @on_connected_subs.delete(cnx) sub.unsubscribe do_checkin.call end end end @on_connected_subs[cnx] = sub end end end end def create_connection ZK.new(@host, @connection_args.merge(:timeout => @connection_timeout)) end end # Bounded # create a connection pool useful for multithreaded applications # # Will spin up `number_of_connections` at creation time and remain fixed at # that number for the life of the pool. # # @example # # pool = ZK::Pool::Simple.new("localhost:2181", :timeout => 10) # pool.checkout do |zk| # zk.create("/mynew_path") # end # class Simple < Bounded # initialize a connection pool using the same optons as ZK.new # @param [String] host the same arguments as ZK.new # @param [Integer] number_of_connections the number of connections to put in the pool # @param [Hash] opts Options to pass on to each connection # @return [ZK::ClientPool] def initialize(host, number_of_connections=10, opts = {}) opts = opts.dup opts[:max_clients] = opts[:min_clients] = number_of_connections.to_i super(host, opts) end end # Simple end # Pool end # ZK