lib/mongo/cluster.rb in mongo-2.12.4 vs lib/mongo/cluster.rb in mongo-2.13.0.beta1

- old
+ new

@@ -1,6 +1,6 @@ -# Copyright (C) 2014-2019 MongoDB, Inc. +# Copyright (C) 2014-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # @@ -74,10 +74,20 @@ # @param [ Array<String> ] seeds The addresses of the configured servers # @param [ Monitoring ] monitoring The monitoring. # @param [ Hash ] options Options. Client constructor forwards its # options to Cluster constructor, although Cluster recognizes # only a subset of the options recognized by Client. + # + # @option options [ true | false ] :direct_connection Whether to connect + # directly to the specified seed, bypassing topology discovery. Exactly + # one seed must be provided. + # @option options [ Symbol ] :connect Deprecated - use :direct_connection + # option instead of this option. The connection method to use. This + # forces the cluster to behave in the specified way instead of + # auto-discovering. One of :direct, :replica_set, :sharded + # @option options [ Symbol ] :replica_set The name of the replica set to + # connect to. Servers not in this replica set will be ignored. # @option options [ true | false ] :scan Whether to scan all seeds # in constructor. The default in driver version 2.x is to do so; # driver version 3.x will not scan seeds in constructor. Opt in to the # new behavior by setting this option to false. *Note:* setting # this option to nil enables scanning seeds in constructor in driver @@ -102,29 +112,43 @@ def initialize(seeds, monitoring, options = Options::Redacted.new) if seeds.nil? raise ArgumentError, 'Seeds cannot be nil' end + options = options.dup if options[:monitoring_io] == false && !options.key?(:cleanup) - options = options.dup options[:cleanup] = false end + @options = options.freeze - seeds = seeds.uniq - + # @update_lock covers @servers, @connecting, @connected, @topology and + # @sessions_supported. Generally instance variables that do not have a + # designated for them lock should only be modified under the update lock. + # Note that topology change is locked by @update_lock and not by + # @sdam_flow_lock. + @update_lock = Mutex.new @servers = [] @monitoring = monitoring @event_listeners = Event::Listeners.new - @options = options.freeze @app_metadata = Server::AppMetadata.new(@options) - @update_lock = Mutex.new - @sdam_flow_lock = Mutex.new - @cluster_time = nil @cluster_time_lock = Mutex.new + @cluster_time = nil @srv_monitor_lock = Mutex.new + @srv_monitor = nil @server_selection_semaphore = Semaphore.new @topology = Topology.initial(self, monitoring, options) + # State change lock is similar to the sdam flow lock, but is designed + # to serialize state changes initated by consumers of Cluster + # (e.g. application connecting or disconnecting the cluster), so that + # e.g. an application calling disconnect-connect-disconnect rapidly + # does not put the cluster into an inconsistent state. + # Monitoring updates performed internally by the driver do not take + # the state change lock. + @state_change_lock = Mutex.new + # @sdam_flow_lock covers just the sdam flow. Note it does not apply + # to @topology replacements which are done under @update_lock. + @sdam_flow_lock = Mutex.new Session::SessionPool.create(self) # The opening topology is always unknown with no servers. # https://github.com/mongodb/specifications/pull/388 opening_topology = Topology::Unknown.new(options, monitoring, self) @@ -132,11 +156,11 @@ publish_sdam_event( Monitoring::TOPOLOGY_OPENING, Monitoring::Event::TopologyOpening.new(opening_topology) ) - @seeds = seeds + @seeds = seeds = seeds.uniq servers = seeds.map do |seed| # Server opening events must be sent after topology change events. # Therefore separate server addition, done here before topoolgy change # event is published, from starting to monitor the server which is # done later. @@ -333,21 +357,34 @@ # @return [ Float ] The heartbeat interval, in seconds. # # @since 2.10.0 # @api private def heartbeat_interval - options[:heartbeat_frequency] || Server::Monitor::HEARTBEAT_FREQUENCY + options[:heartbeat_frequency] || Server::Monitor::DEFAULT_HEARTBEAT_INTERVAL end + # Whether the cluster object is in the process of connecting to its cluster. + # + # @return [ true|false ] Whether the cluster is connecting. + # + # @api private + def connecting? + @update_lock.synchronize do + !!@connecting + end + end + # Whether the cluster object is connected to its cluster. # # @return [ true|false ] Whether the cluster is connected. # # @api private # @since 2.7.0 def connected? - !!@connected + @update_lock.synchronize do + !!@connected + end end # Get a list of server candidates from the cluster that can have operations # executed on them. # @@ -368,11 +405,11 @@ # # @return [ Array<Mongo::Address> ] The addresses. # # @since 2.0.6 def addresses - servers_list.map(&:address).dup + servers_list.map(&:address) end # The logical session timeout value in minutes. # # @example Get the logical session timeout in minutes. @@ -439,36 +476,40 @@ # # @return [ true ] Always true. # # @since 2.1.0 def disconnect! - unless @connecting || @connected - return true - end - if options[:cleanup] != false - session_pool.end_sessions - @periodic_executor.stop! - end - @srv_monitor_lock.synchronize do - if @srv_monitor - @srv_monitor.stop! + @state_change_lock.synchronize do + unless connecting? || connected? + return true end - end - @servers.each do |server| - if server.connected? - server.disconnect! - publish_sdam_event( - Monitoring::SERVER_CLOSED, - Monitoring::Event::ServerClosed.new(server.address, topology) - ) + if options[:cleanup] != false + session_pool.end_sessions + @periodic_executor.stop! end + @srv_monitor_lock.synchronize do + if @srv_monitor + @srv_monitor.stop! + end + end + @servers.each do |server| + if server.connected? + server.disconnect! + publish_sdam_event( + Monitoring::SERVER_CLOSED, + Monitoring::Event::ServerClosed.new(server.address, topology) + ) + end + end + publish_sdam_event( + Monitoring::TOPOLOGY_CLOSED, + Monitoring::Event::TopologyClosed.new(topology) + ) + @update_lock.synchronize do + @connecting = @connected = false + end end - publish_sdam_event( - Monitoring::TOPOLOGY_CLOSED, - Monitoring::Event::TopologyClosed.new(topology) - ) - @connecting = @connected = false true end # Reconnect all servers. # @@ -479,23 +520,29 @@ # # @since 2.1.0 # @deprecated Use Client#reconnect to reconnect to the cluster instead of # calling this method. This method does not send SDAM events. def reconnect! - @connecting = true - scan! - servers.each do |server| - server.reconnect! - end - @periodic_executor.restart! - @srv_monitor_lock.synchronize do - if @srv_monitor - @srv_monitor.run! + @state_change_lock.synchronize do + @update_lock.synchronize do + @connecting = true end + scan! + servers.each do |server| + server.reconnect! + end + @periodic_executor.restart! + @srv_monitor_lock.synchronize do + if @srv_monitor + @srv_monitor.run! + end + end + @update_lock.synchronize do + @connecting = false + @connected = true + end end - @connecting = false - @connected = true end # Force a scan of all known servers in the cluster. # # If the sync parameter is true which is the default, the scan is @@ -734,11 +781,18 @@ def add(host, add_options=nil) address = Address.new(host, options) if !addresses.include?(address) server = Server.new(address, self, @monitoring, event_listeners, options.merge( monitor: false)) - @update_lock.synchronize { @servers.push(server) } + @update_lock.synchronize do + # Need to recheck whether server is present in @servers, because + # the previous check was not under a lock. + # Since we are under the update lock here, we cannot call servers_list. + return if @servers.map(&:address).include?(address) + + @servers.push(server) + end if add_options.nil? || add_options[:monitor] != false server.start_monitoring end server end @@ -763,12 +817,20 @@ # public API. # # @since 2.0.0 def remove(host, disconnect: true) address = Address.new(host) - removed_servers = @servers.select { |s| s.address == address } - @update_lock.synchronize { @servers = @servers - removed_servers } + removed_servers = [] + @update_lock.synchronize do + @servers.delete_if do |server| + (server.address == address).tap do |delete| + if delete + removed_servers << server + end + end + end + end if disconnect != false removed_servers.each do |server| disconnect_server_if_connected(server) end end @@ -779,32 +841,40 @@ end end # @api private def update_topology(new_topology) - old_topology = topology - @topology = new_topology + old_topology = nil + @update_lock.synchronize do + old_topology = topology + @topology = new_topology + end # If new topology has data bearing servers, we know for sure whether # sessions are supported - update our cached value. # If new topology has no data bearing servers, leave the old value # as it is and sessions_supported? method will perform server selection # to try to determine session support accurately, falling back to the # last known value. if topology.data_bearing_servers? - @sessions_supported = !!topology.logical_session_timeout + sessions_supported = !!topology.logical_session_timeout + @update_lock.synchronize do + @sessions_supported = sessions_supported + end end publish_sdam_event( Monitoring::TOPOLOGY_CHANGED, Monitoring::Event::TopologyChanged.new(old_topology, topology) ) end # @api private def servers_list - @update_lock.synchronize { @servers.dup } + @update_lock.synchronize do + @servers.dup + end end # @api private def disconnect_server_if_connected(server) if server.connected? @@ -840,10 +910,12 @@ ServerSelector.get(mode: :primary_preferred).select_server(self) !!topology.logical_session_timeout rescue Error::NoServerAvailable # We haven't been able to contact any servers - use last known # value for esssion support. - @sessions_supported || false + @update_lock.synchronize do + @sessions_supported || false + end end end private