# A ConnectionPool manages access to database connections by keeping
# multiple connections and giving threads exclusive access to each
# connection.
class Sequel::ConnectionPool
  # The proc used to create a new database connection.
  attr_accessor :connection_proc

  # The proc used to disconnect a database connection.
  attr_accessor :disconnection_proc

  # The maximum number of connections.
  attr_reader :max_size
  
  # The mutex that protects access to the other internal variables.  You must use
  # this if you want to manipulate the variables safely.
  attr_reader :mutex
  
  # Constructs a new pool with a maximum size. If a block is supplied, it
  # is used to create new connections as they are needed.
  #
  #   pool = ConnectionPool.new(:max_connections=>10) {MyConnection.new(opts)}
  #
  # The connection creation proc can be changed at any time by assigning a 
  # Proc to pool#connection_proc.
  #
  #   pool = ConnectionPool.new(:max_connections=>10)
  #   pool.connection_proc = proc {MyConnection.new(opts)}
  #
  # The connection pool takes the following options:
  #
  # * :disconnection_proc - The proc called when removing connections from the pool.
  # * :max_connections - The maximum number of connections the connection pool
  #   will open (default 4)
  # * :pool_convert_exceptions - Whether to convert non-StandardError based exceptions
  #   to RuntimeError exceptions (default true)
  # * :pool_sleep_time - The amount of time to sleep before attempting to acquire
  #   a connection again (default 0.001)
  # * :pool_timeout - The amount of seconds to wait to acquire a connection
  #   before raising a PoolTimeoutError (default 5)
  # * :servers - A hash of servers to use.  Keys should be symbols.  If not
  #   present, will use a single :default server.  The server name symbol will
  #   be passed to the connection_proc.
  def initialize(opts = {}, &block)
    @max_size = Integer(opts[:max_connections] || 4)
    raise(Sequel::Error, ':max_connections must be positive') if @max_size < 1
    @mutex = Mutex.new
    @connection_proc = block
    @disconnection_proc = opts[:disconnection_proc]
    @servers = [:default]
    @servers += opts[:servers].keys - @servers if opts[:servers] 
    @available_connections = Hash.new{|h,k| h[:default]}
    @allocated = Hash.new{|h,k| h[:default]}
    @servers.each do |s|
      @available_connections[s] = []
      @allocated[s] = {}
    end
    @timeout = Integer(opts[:pool_timeout] || 5)
    @sleep_time = Float(opts[:pool_sleep_time] || 0.001)
    @convert_exceptions = opts.include?(:pool_convert_exceptions) ? opts[:pool_convert_exceptions] : true
  end
  
  # A hash of connections currently being used for the given server, key is the
  # Thread, value is the connection.
  def allocated(server=:default)
    @allocated[server]
  end
  
  # An array of connections opened but not currently used, for the given
  # server.
  def available_connections(server=:default)
    @available_connections[server]
  end
  
  # The total number of connections opened for the given server, should
  # be equal to available_connections.length + allocated.length
  def created_count(server=:default)
    @allocated[server].length + @available_connections[server].length
  end
  alias size created_count
  
  # Chooses the first available connection to the given server, or if none are
  # available, creates a new connection.  Passes the connection to the supplied
  # block:
  # 
  #   pool.hold {|conn| conn.execute('DROP TABLE posts')}
  # 
  # Pool#hold is re-entrant, meaning it can be called recursively in
  # the same thread without blocking.
  #
  # If no connection is immediately available and the pool is already using the maximum
  # number of connections, Pool#hold will block until a connection
  # is available or the timeout expires.  If the timeout expires before a
  # connection can be acquired, a Sequel::PoolTimeout is 
  # raised.
  def hold(server=:default)
    begin
      t = Thread.current
      if conn = owned_connection(t, server)
        return yield(conn)
      end
      begin
        unless conn = acquire(t, server)
          time = Time.new
          timeout = time + @timeout
          sleep_time = @sleep_time
          sleep sleep_time
          until conn = acquire(t, server)
            raise(::Sequel::PoolTimeout) if Time.new > timeout
            sleep sleep_time
          end
        end
        yield conn
      rescue Sequel::DatabaseDisconnectError => dde
        remove(t, conn, server) if conn
        raise
      ensure
        @mutex.synchronize{release(t, server)} if conn && !dde
      end
    rescue StandardError 
      raise
    rescue Exception => e
      raise(@convert_exceptions ? RuntimeError.new(e.message) : e)
    end
  end
  
  # Removes all connection currently available on all servers, optionally
  # yielding each connection to the given block. This method has the effect of 
  # disconnecting from the database, assuming that no connections are currently
  # being used. Once a connection is requested using #hold, the connection pool
  # creates new connections to the database.
  def disconnect(&block)
    block ||= @disconnection_proc
    @mutex.synchronize do
      @available_connections.each do |server, conns|
        conns.each{|c| block.call(c)} if block
        conns.clear
      end
    end
  end
  
  private

  # Assigns a connection to the supplied thread for the given server, if one
  # is available.
  def acquire(thread, server)
    @mutex.synchronize do
      if conn = available(server)
        allocated(server)[thread] = conn
      end
    end
  end
  
  # Returns an available connection to the given server. If no connection is
  # available, tries to create a new connection.
  def available(server)
    available_connections(server).pop || make_new(server)
  end
  
  # Creates a new connection to the given server if the size of the pool for
  # the server is less than the maximum size of the pool.
  def make_new(server)
    if (n = created_count(server)) >= @max_size
      allocated(server).keys.reject{|t| t.alive?}.each{|t| release(t, server)}
      n = nil
    end
    if (n || created_count(server)) < @max_size
      raise(Sequel::Error, "No connection proc specified") unless @connection_proc
      begin
        conn = @connection_proc.call(server)
      rescue Exception=>exception
        raise Sequel.convert_exception_class(exception, Sequel::DatabaseConnectionError)
      end
      raise(Sequel::DatabaseConnectionError, "Connection parameters not valid") unless conn
      conn
    end
  end
  
  # Returns the connection owned by the supplied thread for the given server,
  # if any.
  def owned_connection(thread, server)
    @mutex.synchronize{@allocated[server][thread]}
  end
  
  # Releases the connection assigned to the supplied thread and server.
  # You must already have the mutex before you call this.
  def release(thread, server)
    available_connections(server) << allocated(server).delete(thread)
  end

  # Removes the currently allocated connection from the connection pool.
  def remove(thread, conn, server)
    @mutex.synchronize do
      allocated(server).delete(thread)
      @disconnection_proc.call(conn) if @disconnection_proc
    end
  end
end

# A SingleThreadedPool acts as a replacement for a ConnectionPool for use
# in single-threaded applications. ConnectionPool imposes a substantial
# performance penalty, so SingleThreadedPool is used to gain some speed.
class Sequel::SingleThreadedPool
  # The proc used to create a new database connection
  attr_writer :connection_proc
  
  # The proc used to disconnect a database connection.
  attr_accessor :disconnection_proc

  # Initializes the instance with the supplied block as the connection_proc.
  #
  # The single threaded pool takes the following options:
  #
  # * :disconnection_proc - The proc called when removing connections from the pool.
  # * :pool_convert_exceptions - Whether to convert non-StandardError based exceptions
  #   to RuntimeError exceptions (default true)
  # * :servers - A hash of servers to use.  Keys should be symbols.  If not
  #   present, will use a single :default server.  The server name symbol will
  #   be passed to the connection_proc.
  def initialize(opts={}, &block)
    @connection_proc = block
    @disconnection_proc = opts[:disconnection_proc]
    @conns = {}
    @convert_exceptions = opts.include?(:pool_convert_exceptions) ? opts[:pool_convert_exceptions] : true
  end
  
  # The connection for the given server.
  def conn(server=:default)
    @conns[server]
  end
  
  # Yields the connection to the supplied block for the given server.
  # This method simulates the ConnectionPool#hold API.
  def hold(server=:default)
    begin
      begin
        yield(c = (@conns[server] ||= @connection_proc.call(server)))
      rescue Sequel::DatabaseDisconnectError
        @conns.delete(server)
        @disconnection_proc.call(c) if @disconnection_proc
        raise
      end
    rescue Exception => e
      # if the error is not a StandardError it is converted into RuntimeError.
      raise(@convert_exceptions && !e.is_a?(StandardError) ? RuntimeError.new(e.message) : e)
    end
  end
  
  # Disconnects from the database. Once a connection is requested using
  # #hold, the connection is reestablished.
  def disconnect(&block)
    block ||= @disconnection_proc
    @conns.values.each{|conn| block.call(conn) if block}
    @conns = {}
  end
end