# frozen_string_literal: true require "concurrent/map" require "monitor" require "active_record/connection_adapters/abstract/connection_pool/queue" require "active_record/connection_adapters/abstract/connection_pool/reaper" module ActiveRecord module ConnectionAdapters module AbstractPool # :nodoc: end class NullPool # :nodoc: include ConnectionAdapters::AbstractPool class NullConfig # :nodoc: def method_missing(...) nil end end NULL_CONFIG = NullConfig.new # :nodoc: def initialize super() @mutex = Mutex.new @server_version = nil end def server_version(connection) # :nodoc: @server_version || @mutex.synchronize { @server_version ||= connection.get_database_version } end def schema_reflection SchemaReflection.new(nil) end def schema_cache; end def connection_class; end def checkin(_); end def remove(_); end def async_executor; end def db_config NULL_CONFIG end def dirties_query_cache true end end # = Active Record Connection Pool # # Connection pool base class for managing Active Record database # connections. # # == Introduction # # A connection pool synchronizes thread access to a limited number of # database connections. The basic idea is that each thread checks out a # database connection from the pool, uses that connection, and checks the # connection back in. ConnectionPool is completely thread-safe, and will # ensure that a connection cannot be used by two threads at the same time, # as long as ConnectionPool's contract is correctly followed. It will also # handle cases in which there are more threads than connections: if all # connections have been checked out, and a thread tries to checkout a # connection anyway, then ConnectionPool will wait until some other thread # has checked in a connection, or the +checkout_timeout+ has expired. # # == Obtaining (checking out) a connection # # Connections can be obtained and used from a connection pool in several # ways: # # 1. Simply use {ActiveRecord::Base.lease_connection}[rdoc-ref:ConnectionHandling#lease_connection]. # When you're done with the connection(s) and wish it to be returned to the pool, you call # {ActiveRecord::Base.connection_handler.clear_active_connections!}[rdoc-ref:ConnectionAdapters::ConnectionHandler#clear_active_connections!]. # This is the default behavior for Active Record when used in conjunction with # Action Pack's request handling cycle. # 2. Manually check out a connection from the pool with # {ActiveRecord::Base.connection_pool.checkout}[rdoc-ref:#checkout]. You are responsible for # returning this connection to the pool when finished by calling # {ActiveRecord::Base.connection_pool.checkin(connection)}[rdoc-ref:#checkin]. # 3. Use {ActiveRecord::Base.connection_pool.with_connection(&block)}[rdoc-ref:#with_connection], which # obtains a connection, yields it as the sole argument to the block, # and returns it to the pool after the block completes. # # Connections in the pool are actually AbstractAdapter objects (or objects # compatible with AbstractAdapter's interface). # # While a thread has a connection checked out from the pool using one of the # above three methods, that connection will automatically be the one used # by ActiveRecord queries executing on that thread. It is not required to # explicitly pass the checked out connection to \Rails models or queries, for # example. # # == Options # # There are several connection-pooling-related options that you can add to # your database connection configuration: # # * +pool+: maximum number of connections the pool may manage (default 5). # * +idle_timeout+: number of seconds that a connection will be kept # unused in the pool before it is automatically disconnected (default # 300 seconds). Set this to zero to keep connections forever. # * +checkout_timeout+: number of seconds to wait for a connection to # become available before giving up and raising a timeout error (default # 5 seconds). # #-- # Synchronization policy: # * all public methods can be called outside +synchronize+ # * access to these instance variables needs to be in +synchronize+: # * @connections # * @now_connecting # * private methods that require being called in a +synchronize+ blocks # are now explicitly documented class ConnectionPool class WeakThreadKeyMap # :nodoc: # FIXME: On 3.3 we could use ObjectSpace::WeakKeyMap # but it currently cause GC crashes: https://github.com/byroot/rails/pull/3 def initialize @map = {} end def clear @map.clear end def [](key) @map[key] end def []=(key, value) @map.select! { |c, _| c.alive? } @map[key] = value end end class Lease # :nodoc: attr_accessor :connection, :sticky def initialize @connection = nil @sticky = nil end def release conn = @connection @connection = nil @sticky = nil conn end def clear(connection) if @connection == connection @connection = nil @sticky = nil true else false end end end class LeaseRegistry # :nodoc: def initialize @mutex = Mutex.new @map = WeakThreadKeyMap.new end def [](context) @mutex.synchronize do @map[context] ||= Lease.new end end def clear @mutex.synchronize do @map.clear end end end module ExecutorHooks # :nodoc: class << self def run # noop end def complete(_) ActiveRecord::Base.connection_handler.each_connection_pool do |pool| if (connection = pool.active_connection?) transaction = connection.current_transaction if transaction.closed? || !transaction.joinable? pool.release_connection end end end end end end class << self def install_executor_hooks(executor = ActiveSupport::Executor) executor.register_hook(ExecutorHooks) end end include MonitorMixin prepend QueryCache::ConnectionPoolConfiguration include ConnectionAdapters::AbstractPool attr_accessor :automatic_reconnect, :checkout_timeout attr_reader :db_config, :size, :reaper, :pool_config, :async_executor, :role, :shard delegate :schema_reflection, :server_version, to: :pool_config # Creates a new ConnectionPool object. +pool_config+ is a PoolConfig # object which describes database connection information (e.g. adapter, # host name, username, password, etc), as well as the maximum size for # this ConnectionPool. # # The default ConnectionPool maximum size is 5. def initialize(pool_config) super() @pool_config = pool_config @db_config = pool_config.db_config @role = pool_config.role @shard = pool_config.shard @checkout_timeout = db_config.checkout_timeout @idle_timeout = db_config.idle_timeout @size = db_config.pool # This variable tracks the cache of threads mapped to reserved connections, with the # sole purpose of speeding up the +connection+ method. It is not the authoritative # registry of which thread owns which connection. Connection ownership is tracked by # the +connection.owner+ attr on each +connection+ instance. # The invariant works like this: if there is mapping of thread => conn, # then that +thread+ does indeed own that +conn+. However, an absence of such # mapping does not mean that the +thread+ doesn't own the said connection. In # that case +conn.owner+ attr should be consulted. # Access and modification of @leases does not require # synchronization. @leases = LeaseRegistry.new @connections = [] @automatic_reconnect = true # Connection pool allows for concurrent (outside the main +synchronize+ section) # establishment of new connections. This variable tracks the number of threads # currently in the process of independently establishing connections to the DB. @now_connecting = 0 @threads_blocking_new_connections = 0 @available = ConnectionLeasingQueue.new self @pinned_connection = nil @pinned_connections_depth = 0 @async_executor = build_async_executor @schema_cache = nil @reaper = Reaper.new(self, db_config.reaping_frequency) @reaper.run end def inspect # :nodoc: name_field = " name=#{db_config.name.inspect}" unless db_config.name == "primary" shard_field = " shard=#{@shard.inspect}" unless @shard == :default "#<#{self.class.name} env_name=#{db_config.env_name.inspect}#{name_field} role=#{role.inspect}#{shard_field}>" end def schema_cache @schema_cache ||= BoundSchemaReflection.new(schema_reflection, self) end def schema_reflection=(schema_reflection) pool_config.schema_reflection = schema_reflection @schema_cache = nil end def migration_context # :nodoc: MigrationContext.new(migrations_paths, schema_migration, internal_metadata) end def migrations_paths # :nodoc: db_config.migrations_paths || Migrator.migrations_paths end def schema_migration # :nodoc: SchemaMigration.new(self) end def internal_metadata # :nodoc: InternalMetadata.new(self) end # Retrieve the connection associated with the current thread, or call # #checkout to obtain one if necessary. # # #lease_connection can be called any number of times; the connection is # held in a cache keyed by a thread. def lease_connection lease = connection_lease lease.sticky = true lease.connection ||= checkout end def permanent_lease? # :nodoc: connection_lease.sticky.nil? end def pin_connection!(lock_thread) # :nodoc: @pinned_connection ||= (connection_lease&.connection || checkout) @pinned_connections_depth += 1 # Any leased connection must be in @connections otherwise # some methods like #connected? won't behave correctly unless @connections.include?(@pinned_connection) @connections << @pinned_connection end @pinned_connection.lock_thread = ActiveSupport::IsolatedExecutionState.context if lock_thread @pinned_connection.verify! # eagerly validate the connection @pinned_connection.begin_transaction joinable: false, _lazy: false end def unpin_connection! # :nodoc: raise "There isn't a pinned connection #{object_id}" unless @pinned_connection clean = true @pinned_connection.lock.synchronize do @pinned_connections_depth -= 1 connection = @pinned_connection @pinned_connection = nil if @pinned_connections_depth.zero? if connection.transaction_open? connection.rollback_transaction else # Something committed or rolled back the transaction clean = false connection.reset! end if @pinned_connection.nil? connection.steal! connection.lock_thread = nil checkin(connection) end end clean end def connection_class # :nodoc: pool_config.connection_class end # Returns true if there is an open connection being used for the current thread. # # This method only works for connections that have been obtained through # #lease_connection or #with_connection methods. Connections obtained through # #checkout will not be detected by #active_connection? def active_connection? connection_lease.connection end alias_method :active_connection, :active_connection? # :nodoc: # Signal that the thread is finished with the current connection. # #release_connection releases the connection-thread association # and returns the connection to the pool. # # This method only works for connections that have been obtained through # #lease_connection or #with_connection methods, connections obtained through # #checkout will not be automatically released. def release_connection(existing_lease = nil) if conn = connection_lease.release checkin conn return true end false end # Yields a connection from the connection pool to the block. If no connection # is already checked out by the current thread, a connection will be checked # out from the pool, yielded to the block, and then returned to the pool when # the block is finished. If a connection has already been checked out on the # current thread, such as via #lease_connection or #with_connection, that existing # connection will be the one yielded and it will not be returned to the pool # automatically at the end of the block; it is expected that such an existing # connection will be properly returned to the pool by the code that checked # it out. def with_connection(prevent_permanent_checkout: false) lease = connection_lease sticky_was = lease.sticky lease.sticky = false if prevent_permanent_checkout if lease.connection begin yield lease.connection ensure lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was end else begin yield lease.connection = checkout ensure lease.sticky = sticky_was if prevent_permanent_checkout && !sticky_was release_connection(lease) unless lease.sticky end end end # Returns true if a connection has already been opened. def connected? synchronize { @connections.any?(&:connected?) } end # Returns an array containing the connections currently in the pool. # Access to the array does not require synchronization on the pool because # the array is newly created and not retained by the pool. # # However; this method bypasses the ConnectionPool's thread-safe connection # access pattern. A returned connection may be owned by another thread, # unowned, or by happen-stance owned by the calling thread. # # Calling methods on a connection without ownership is subject to the # thread-safety guarantees of the underlying method. Many of the methods # on connection adapter classes are inherently multi-thread unsafe. def connections synchronize { @connections.dup } end # Disconnects all connections in the pool, and clears the pool. # # Raises: # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all # connections in the pool within a timeout interval (default duration is # spec.db_config.checkout_timeout * 2 seconds). def disconnect(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! end @connections = [] @leases.clear @available.clear end end end # Disconnects all connections in the pool, and clears the pool. # # The pool first tries to gain ownership of all connections. If unable to # do so within a timeout interval (default duration is # spec.db_config.checkout_timeout * 2 seconds), then the pool is forcefully # disconnected without any regard for other connection owning threads. def disconnect! disconnect(false) end # Discards all connections in the pool (even if they're currently # leased!), along with the pool itself. Any further interaction with the # pool (except #spec and #schema_cache) is undefined. # # See AbstractAdapter#discard! def discard! # :nodoc: synchronize do return if self.discarded? @connections.each do |conn| conn.discard! end @connections = @available = @leases = nil end end def discarded? # :nodoc: @connections.nil? end # Clears the cache which maps classes and re-connects connections that # require reloading. # # Raises: # - ActiveRecord::ExclusiveConnectionTimeoutError if unable to gain ownership of all # connections in the pool within a timeout interval (default duration is # spec.db_config.checkout_timeout * 2 seconds). def clear_reloadable_connections(raise_on_acquisition_timeout = true) with_exclusively_acquired_all_connections(raise_on_acquisition_timeout) do synchronize do @connections.each do |conn| if conn.in_use? conn.steal! checkin conn end conn.disconnect! if conn.requires_reloading? end @connections.delete_if(&:requires_reloading?) @available.clear end end end # Clears the cache which maps classes and re-connects connections that # require reloading. # # The pool first tries to gain ownership of all connections. If unable to # do so within a timeout interval (default duration is # spec.db_config.checkout_timeout * 2 seconds), then the pool forcefully # clears the cache and reloads connections without any regard for other # connection owning threads. def clear_reloadable_connections! clear_reloadable_connections(false) end # Check-out a database connection from the pool, indicating that you want # to use it. You should call #checkin when you no longer need this. # # This is done by either returning and leasing existing connection, or by # creating a new connection and leasing it. # # If all connections are leased and the pool is at capacity (meaning the # number of currently leased connections is greater than or equal to the # size limit set), an ActiveRecord::ConnectionTimeoutError exception will be raised. # # Returns: an AbstractAdapter object. # # Raises: # - ActiveRecord::ConnectionTimeoutError no connection can be obtained from the pool. def checkout(checkout_timeout = @checkout_timeout) if @pinned_connection @pinned_connection.lock.synchronize do synchronize do @pinned_connection.verify! # Any leased connection must be in @connections otherwise # some methods like #connected? won't behave correctly unless @connections.include?(@pinned_connection) @connections << @pinned_connection end end end @pinned_connection else checkout_and_verify(acquire_connection(checkout_timeout)) end end # Check-in a database connection back into the pool, indicating that you # no longer need this connection. # # +conn+: an AbstractAdapter object, which was obtained by earlier by # calling #checkout on this pool. def checkin(conn) return if @pinned_connection.equal?(conn) conn.lock.synchronize do synchronize do connection_lease.clear(conn) conn._run_checkin_callbacks do conn.expire end @available.add conn end end end # Remove a connection from the connection pool. The connection will # remain open and active but will no longer be managed by this pool. def remove(conn) needs_new_connection = false synchronize do remove_connection_from_thread_cache conn @connections.delete conn @available.delete conn # @available.any_waiting? => true means that prior to removing this # conn, the pool was at its max size (@connections.size == @size). # This would mean that any threads stuck waiting in the queue wouldn't # know they could checkout_new_connection, so let's do it for them. # Because condition-wait loop is encapsulated in the Queue class # (that in turn is oblivious to ConnectionPool implementation), threads # that are "stuck" there are helpless. They have no way of creating # new connections and are completely reliant on us feeding available # connections into the Queue. needs_new_connection = @available.any_waiting? end # This is intentionally done outside of the synchronized section as we # would like not to hold the main mutex while checking out new connections. # Thus there is some chance that needs_new_connection information is now # stale, we can live with that (bulk_make_new_connections will make # sure not to exceed the pool's @size limit). bulk_make_new_connections(1) if needs_new_connection end # Recover lost connections for the pool. A lost connection can occur if # a programmer forgets to checkin a connection at the end of a thread # or a thread dies unexpectedly. def reap stale_connections = synchronize do return if self.discarded? @connections.select do |conn| conn.in_use? && !conn.owner.alive? end.each do |conn| conn.steal! end end stale_connections.each do |conn| if conn.active? conn.reset! checkin conn else remove conn end end end # Disconnect all connections that have been idle for at least # +minimum_idle+ seconds. Connections currently checked out, or that were # checked in less than +minimum_idle+ seconds ago, are unaffected. def flush(minimum_idle = @idle_timeout) return if minimum_idle.nil? idle_connections = synchronize do return if self.discarded? @connections.select do |conn| !conn.in_use? && conn.seconds_idle >= minimum_idle end.each do |conn| conn.lease @available.delete conn @connections.delete conn end end idle_connections.each do |conn| conn.disconnect! end end # Disconnect all currently idle connections. Connections currently checked # out are unaffected. def flush! reap flush(-1) end def num_waiting_in_queue # :nodoc: @available.num_waiting end # Returns the connection pool's usage statistic. # # ActiveRecord::Base.connection_pool.stat # => { size: 15, connections: 1, busy: 1, dead: 0, idle: 0, waiting: 0, checkout_timeout: 5 } def stat synchronize do { size: size, connections: @connections.size, busy: @connections.count { |c| c.in_use? && c.owner.alive? }, dead: @connections.count { |c| c.in_use? && !c.owner.alive? }, idle: @connections.count { |c| !c.in_use? }, waiting: num_waiting_in_queue, checkout_timeout: checkout_timeout } end end def schedule_query(future_result) # :nodoc: @async_executor.post { future_result.execute_or_skip } Thread.pass end private def connection_lease @leases[ActiveSupport::IsolatedExecutionState.context] end def build_async_executor case ActiveRecord.async_query_executor when :multi_thread_pool if @db_config.max_threads > 0 Concurrent::ThreadPoolExecutor.new( min_threads: @db_config.min_threads, max_threads: @db_config.max_threads, max_queue: @db_config.max_queue, fallback_policy: :caller_runs ) end when :global_thread_pool ActiveRecord.global_thread_pool_async_query_executor end end #-- # this is unfortunately not concurrent def bulk_make_new_connections(num_new_conns_needed) num_new_conns_needed.times do # try_to_checkout_new_connection will not exceed pool's @size limit if new_conn = try_to_checkout_new_connection # make the new_conn available to the starving threads stuck @available Queue checkin(new_conn) end end end # Take control of all existing connections so a "group" action such as # reload/disconnect can be performed safely. It is no longer enough to # wrap it in +synchronize+ because some pool's actions are allowed # to be performed outside of the main +synchronize+ block. def with_exclusively_acquired_all_connections(raise_on_acquisition_timeout = true) with_new_connections_blocked do attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout) yield end end def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) collected_conns = synchronize do reap # No need to wait for dead owners # account for our own connections @connections.select { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context } end newly_checked_out = [] timeout_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + (@checkout_timeout * 2) @available.with_a_bias_for(ActiveSupport::IsolatedExecutionState.context) do loop do synchronize do return if collected_conns.size == @connections.size && @now_connecting == 0 remaining_timeout = timeout_time - Process.clock_gettime(Process::CLOCK_MONOTONIC) remaining_timeout = 0 if remaining_timeout < 0 conn = checkout_for_exclusive_access(remaining_timeout) collected_conns << conn newly_checked_out << conn end end end rescue ExclusiveConnectionTimeoutError # raise_on_acquisition_timeout == false means we are directed to ignore any # timeouts and are expected to just give up: we've obtained as many connections # as possible, note that in a case like that we don't return any of the # +newly_checked_out+ connections. if raise_on_acquisition_timeout release_newly_checked_out = true raise end rescue Exception # if something else went wrong # this can't be a "naked" rescue, because we have should return conns # even for non-StandardErrors release_newly_checked_out = true raise ensure if release_newly_checked_out && newly_checked_out # releasing only those conns that were checked out in this method, conns # checked outside this method (before it was called) are not for us to release newly_checked_out.each { |conn| checkin(conn) } end end #-- # Must be called in a synchronize block. def checkout_for_exclusive_access(checkout_timeout) checkout(checkout_timeout) rescue ConnectionTimeoutError # this block can't be easily moved into attempt_to_checkout_all_existing_connections's # rescue block, because doing so would put it outside of synchronize section, without # being in a critical section thread_report might become inaccurate msg = +"could not obtain ownership of all database connections in #{checkout_timeout} seconds" thread_report = [] @connections.each do |conn| unless conn.owner == ActiveSupport::IsolatedExecutionState.context thread_report << "#{conn} is owned by #{conn.owner}" end end msg << " (#{thread_report.join(', ')})" if thread_report.any? raise ExclusiveConnectionTimeoutError.new(msg, connection_pool: self) end def with_new_connections_blocked synchronize do @threads_blocking_new_connections += 1 end yield ensure num_new_conns_required = 0 synchronize do @threads_blocking_new_connections -= 1 if @threads_blocking_new_connections.zero? @available.clear num_new_conns_required = num_waiting_in_queue @connections.each do |conn| next if conn.in_use? @available.add conn num_new_conns_required -= 1 end end end bulk_make_new_connections(num_new_conns_required) if num_new_conns_required > 0 end # Acquire a connection by one of 1) immediately removing one # from the queue of available connections, 2) creating a new # connection if the pool is not at capacity, 3) waiting on the # queue for a connection to become available. # # Raises: # - ActiveRecord::ConnectionTimeoutError if a connection could not be acquired # #-- # Implementation detail: the connection returned by +acquire_connection+ # will already be "+connection.lease+ -ed" to the current thread. def acquire_connection(checkout_timeout) # NOTE: we rely on @available.poll and +try_to_checkout_new_connection+ to # +conn.lease+ the returned connection (and to do this in a +synchronized+ # section). This is not the cleanest implementation, as ideally we would # synchronize { conn.lease } in this method, but by leaving it to @available.poll # and +try_to_checkout_new_connection+ we can piggyback on +synchronize+ sections # of the said methods and avoid an additional +synchronize+ overhead. if conn = @available.poll || try_to_checkout_new_connection conn else reap # Retry after reaping, which may return an available connection, # remove an inactive connection, or both if conn = @available.poll || try_to_checkout_new_connection conn else @available.poll(checkout_timeout) end end rescue ConnectionTimeoutError => ex raise ex.set_pool(self) end #-- # if owner_thread param is omitted, this must be called in synchronize block def remove_connection_from_thread_cache(conn, owner_thread = conn.owner) @leases[owner_thread].clear(conn) end alias_method :release, :remove_connection_from_thread_cache def new_connection connection = db_config.new_connection connection.pool = self connection rescue ConnectionNotEstablished => ex raise ex.set_pool(self) end # If the pool is not at a @size limit, establish new connection. Connecting # to the DB is done outside main synchronized section. #-- # Implementation constraint: a newly established connection returned by this # method must be in the +.leased+ state. def try_to_checkout_new_connection # first in synchronized section check if establishing new conns is allowed # and increment @now_connecting, to prevent overstepping this pool's @size # constraint do_checkout = synchronize do if @threads_blocking_new_connections.zero? && (@connections.size + @now_connecting) < @size @now_connecting += 1 end end if do_checkout begin # if successfully incremented @now_connecting establish new connection # outside of synchronized section conn = checkout_new_connection ensure synchronize do if conn adopt_connection(conn) # returned conn needs to be already leased conn.lease end @now_connecting -= 1 end end end end def adopt_connection(conn) conn.pool = self @connections << conn # We just created the first connection, it's time to load the schema # cache if that wasn't eagerly done before if @schema_cache.nil? && ActiveRecord.lazily_load_schema_cache schema_cache.load! end end def checkout_new_connection raise ConnectionNotEstablished unless @automatic_reconnect new_connection end def checkout_and_verify(c) c._run_checkout_callbacks do c.clean! end c rescue Exception remove c c.disconnect! raise end end end end