# frozen-string-literal: true # The base connection pool class, which all other connection pools are based # on. This class is not instantiated directly, but subclasses should at # the very least implement the following API: # # initialize(Database, Hash) :: Initialize using the passed Sequel::Database # object and options hash. # hold(Symbol, &block) :: Yield a connection object (obtained from calling # the block passed to +initialize+) to the current block. For sharded # connection pools, the Symbol passed is the shard/server to use. # disconnect(Symbol) :: Disconnect the connection object. For sharded # connection pools, the Symbol passed is the shard/server to use. # servers :: An array of shard/server symbols for all shards/servers that this # connection pool recognizes. # size :: an integer representing the total number of connections in the pool, # or for the given shard/server if sharding is supported. # max_size :: an integer representing the maximum size of the connection pool, # or the maximum size per shard/server if sharding is supported. # # For sharded connection pools, the sharded API adds the following methods: # # add_servers(Array of Symbols) :: start recognizing all shards/servers specified # by the array of symbols. # remove_servers(Array of Symbols) :: no longer recognize all shards/servers # specified by the array of symbols. class Sequel::ConnectionPool OPTS = Sequel::OPTS POOL_CLASS_MAP = { :threaded => :ThreadedConnectionPool, :single => :SingleConnectionPool, :sharded_threaded => :ShardedThreadedConnectionPool, :sharded_single => :ShardedSingleConnectionPool, :timed_queue => :TimedQueueConnectionPool, :sharded_timed_queue => :ShardedTimedQueueConnectionPool, } POOL_CLASS_MAP.to_a.each{|k, v| POOL_CLASS_MAP[k.to_s] = v} POOL_CLASS_MAP.freeze # Class methods used to return an appropriate pool subclass, separated # into a module for easier overridding by extensions. module ClassMethods # Return a pool subclass instance based on the given options. If a :pool_class # option is provided is provided, use that pool class, otherwise # use a new instance of an appropriate pool subclass based on the # +SEQUEL_DEFAULT_CONNECTION_POOL+ environment variable if set, or # the :single_threaded and :servers options, otherwise. def get_pool(db, opts = OPTS) connection_pool_class(opts).new(db, opts) end private # Return a connection pool class based on the given options. def connection_pool_class(opts) if pc = opts[:pool_class] unless pc.is_a?(Class) unless name = POOL_CLASS_MAP[pc] raise Sequel::Error, "unsupported connection pool type, please pass appropriate class as the :pool_class option" end require_relative "connection_pool/#{pc}" pc = Sequel.const_get(name) end pc elsif pc = ENV['SEQUEL_DEFAULT_CONNECTION_POOL'] pc = "sharded_#{pc}" if opts[:servers] && !pc.start_with?('sharded_') connection_pool_class(:pool_class=>pc) else pc = if opts[:single_threaded] opts[:servers] ? :sharded_single : :single # :nocov: elsif RUBY_VERSION >= '3.4' # SEQUEL6 or maybe earlier switch to 3.2 opts[:servers] ? :sharded_timed_queue : :timed_queue # :nocov: else opts[:servers] ? :sharded_threaded : :threaded end connection_pool_class(:pool_class=>pc) end end end extend ClassMethods # The after_connect proc used for this pool. This is called with each new # connection made, and is usually used to set custom per-connection settings. # Deprecated. attr_reader :after_connect # SEQUEL6: Remove # Override the after_connect proc for the connection pool. Deprecated. # Disables support for shard-specific :after_connect and :connect_sqls if used. def after_connect=(v) # SEQUEL6: Remove @use_old_connect_api = true @after_connect = v end # An array of sql strings to execute on each new connection. Deprecated. attr_reader :connect_sqls # SEQUEL6: Remove # Override the connect_sqls for the connection pool. Deprecated. # Disables support for shard-specific :after_connect and :connect_sqls if used. def connect_sqls=(v) # SEQUEL6: Remove @use_old_connect_api = true @connect_sqls = v end # The Sequel::Database object tied to this connection pool. attr_accessor :db # Instantiates a connection pool with the given Database and options. def initialize(db, opts=OPTS) # SEQUEL6: Remove second argument, always use db.opts @db = db @use_old_connect_api = false # SEQUEL6: Remove @after_connect = opts[:after_connect] # SEQUEL6: Remove @connect_sqls = opts[:connect_sqls] # SEQUEL6: Remove @error_classes = db.send(:database_error_classes).dup.freeze end # An array of symbols for all shards/servers, which is a single :default by default. def servers [:default] end private # Remove the connection from the pool. For threaded connections, this should be # called without the mutex, because the disconnection may block. def disconnect_connection(conn) db.disconnect_connection(conn) end # Whether the given exception is a disconnect exception. def disconnect_error?(exception) exception.is_a?(Sequel::DatabaseDisconnectError) || db.send(:disconnect_error?, exception, OPTS) end # Return a new connection by calling the connection proc with the given server name, # and checking for connection errors. def make_new(server) begin if @use_old_connect_api # SEQUEL6: Remove block conn = @db.connect(server) if ac = @after_connect if ac.arity == 2 ac.call(conn, server) else ac.call(conn) end end if cs = @connect_sqls cs.each do |sql| db.send(:log_connection_execute, conn, sql) end end conn else @db.new_connection(server) end rescue Exception=>exception raise Sequel.convert_exception_class(exception, Sequel::DatabaseConnectionError) end || raise(Sequel::DatabaseConnectionError, "Connection parameters not valid") end end