lib/mongo/cluster.rb in mongo-2.12.4 vs lib/mongo/cluster.rb in mongo-2.13.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-# Copyright (C) 2014-2019 MongoDB, Inc.
+# Copyright (C) 2014-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -74,10 +74,20 @@
# @param [ Array<String> ] seeds The addresses of the configured servers
# @param [ Monitoring ] monitoring The monitoring.
# @param [ Hash ] options Options. Client constructor forwards its
# options to Cluster constructor, although Cluster recognizes
# only a subset of the options recognized by Client.
+ #
+ # @option options [ true | false ] :direct_connection Whether to connect
+ # directly to the specified seed, bypassing topology discovery. Exactly
+ # one seed must be provided.
+ # @option options [ Symbol ] :connect Deprecated - use :direct_connection
+ # option instead of this option. The connection method to use. This
+ # forces the cluster to behave in the specified way instead of
+ # auto-discovering. One of :direct, :replica_set, :sharded
+ # @option options [ Symbol ] :replica_set The name of the replica set to
+ # connect to. Servers not in this replica set will be ignored.
# @option options [ true | false ] :scan Whether to scan all seeds
# in constructor. The default in driver version 2.x is to do so;
# driver version 3.x will not scan seeds in constructor. Opt in to the
# new behavior by setting this option to false. *Note:* setting
# this option to nil enables scanning seeds in constructor in driver
@@ -102,29 +112,43 @@
def initialize(seeds, monitoring, options = Options::Redacted.new)
if seeds.nil?
raise ArgumentError, 'Seeds cannot be nil'
end
+ options = options.dup
if options[:monitoring_io] == false && !options.key?(:cleanup)
- options = options.dup
options[:cleanup] = false
end
+ @options = options.freeze
- seeds = seeds.uniq
-
+ # @update_lock covers @servers, @connecting, @connected, @topology and
+ # @sessions_supported. Generally instance variables that do not have a
+ # designated for them lock should only be modified under the update lock.
+ # Note that topology change is locked by @update_lock and not by
+ # @sdam_flow_lock.
+ @update_lock = Mutex.new
@servers = []
@monitoring = monitoring
@event_listeners = Event::Listeners.new
- @options = options.freeze
@app_metadata = Server::AppMetadata.new(@options)
- @update_lock = Mutex.new
- @sdam_flow_lock = Mutex.new
- @cluster_time = nil
@cluster_time_lock = Mutex.new
+ @cluster_time = nil
@srv_monitor_lock = Mutex.new
+ @srv_monitor = nil
@server_selection_semaphore = Semaphore.new
@topology = Topology.initial(self, monitoring, options)
+ # State change lock is similar to the sdam flow lock, but is designed
+ # to serialize state changes initated by consumers of Cluster
+ # (e.g. application connecting or disconnecting the cluster), so that
+ # e.g. an application calling disconnect-connect-disconnect rapidly
+ # does not put the cluster into an inconsistent state.
+ # Monitoring updates performed internally by the driver do not take
+ # the state change lock.
+ @state_change_lock = Mutex.new
+ # @sdam_flow_lock covers just the sdam flow. Note it does not apply
+ # to @topology replacements which are done under @update_lock.
+ @sdam_flow_lock = Mutex.new
Session::SessionPool.create(self)
# The opening topology is always unknown with no servers.
# https://github.com/mongodb/specifications/pull/388
opening_topology = Topology::Unknown.new(options, monitoring, self)
@@ -132,11 +156,11 @@
publish_sdam_event(
Monitoring::TOPOLOGY_OPENING,
Monitoring::Event::TopologyOpening.new(opening_topology)
)
- @seeds = seeds
+ @seeds = seeds = seeds.uniq
servers = seeds.map do |seed|
# Server opening events must be sent after topology change events.
# Therefore separate server addition, done here before topoolgy change
# event is published, from starting to monitor the server which is
# done later.
@@ -333,21 +357,34 @@
# @return [ Float ] The heartbeat interval, in seconds.
#
# @since 2.10.0
# @api private
def heartbeat_interval
- options[:heartbeat_frequency] || Server::Monitor::HEARTBEAT_FREQUENCY
+ options[:heartbeat_frequency] || Server::Monitor::DEFAULT_HEARTBEAT_INTERVAL
end
+ # Whether the cluster object is in the process of connecting to its cluster.
+ #
+ # @return [ true|false ] Whether the cluster is connecting.
+ #
+ # @api private
+ def connecting?
+ @update_lock.synchronize do
+ !!@connecting
+ end
+ end
+
# Whether the cluster object is connected to its cluster.
#
# @return [ true|false ] Whether the cluster is connected.
#
# @api private
# @since 2.7.0
def connected?
- !!@connected
+ @update_lock.synchronize do
+ !!@connected
+ end
end
# Get a list of server candidates from the cluster that can have operations
# executed on them.
#
@@ -368,11 +405,11 @@
#
# @return [ Array<Mongo::Address> ] The addresses.
#
# @since 2.0.6
def addresses
- servers_list.map(&:address).dup
+ servers_list.map(&:address)
end
# The logical session timeout value in minutes.
#
# @example Get the logical session timeout in minutes.
@@ -439,36 +476,40 @@
#
# @return [ true ] Always true.
#
# @since 2.1.0
def disconnect!
- unless @connecting || @connected
- return true
- end
- if options[:cleanup] != false
- session_pool.end_sessions
- @periodic_executor.stop!
- end
- @srv_monitor_lock.synchronize do
- if @srv_monitor
- @srv_monitor.stop!
+ @state_change_lock.synchronize do
+ unless connecting? || connected?
+ return true
end
- end
- @servers.each do |server|
- if server.connected?
- server.disconnect!
- publish_sdam_event(
- Monitoring::SERVER_CLOSED,
- Monitoring::Event::ServerClosed.new(server.address, topology)
- )
+ if options[:cleanup] != false
+ session_pool.end_sessions
+ @periodic_executor.stop!
end
+ @srv_monitor_lock.synchronize do
+ if @srv_monitor
+ @srv_monitor.stop!
+ end
+ end
+ @servers.each do |server|
+ if server.connected?
+ server.disconnect!
+ publish_sdam_event(
+ Monitoring::SERVER_CLOSED,
+ Monitoring::Event::ServerClosed.new(server.address, topology)
+ )
+ end
+ end
+ publish_sdam_event(
+ Monitoring::TOPOLOGY_CLOSED,
+ Monitoring::Event::TopologyClosed.new(topology)
+ )
+ @update_lock.synchronize do
+ @connecting = @connected = false
+ end
end
- publish_sdam_event(
- Monitoring::TOPOLOGY_CLOSED,
- Monitoring::Event::TopologyClosed.new(topology)
- )
- @connecting = @connected = false
true
end
# Reconnect all servers.
#
@@ -479,23 +520,29 @@
#
# @since 2.1.0
# @deprecated Use Client#reconnect to reconnect to the cluster instead of
# calling this method. This method does not send SDAM events.
def reconnect!
- @connecting = true
- scan!
- servers.each do |server|
- server.reconnect!
- end
- @periodic_executor.restart!
- @srv_monitor_lock.synchronize do
- if @srv_monitor
- @srv_monitor.run!
+ @state_change_lock.synchronize do
+ @update_lock.synchronize do
+ @connecting = true
end
+ scan!
+ servers.each do |server|
+ server.reconnect!
+ end
+ @periodic_executor.restart!
+ @srv_monitor_lock.synchronize do
+ if @srv_monitor
+ @srv_monitor.run!
+ end
+ end
+ @update_lock.synchronize do
+ @connecting = false
+ @connected = true
+ end
end
- @connecting = false
- @connected = true
end
# Force a scan of all known servers in the cluster.
#
# If the sync parameter is true which is the default, the scan is
@@ -734,11 +781,18 @@
def add(host, add_options=nil)
address = Address.new(host, options)
if !addresses.include?(address)
server = Server.new(address, self, @monitoring, event_listeners, options.merge(
monitor: false))
- @update_lock.synchronize { @servers.push(server) }
+ @update_lock.synchronize do
+ # Need to recheck whether server is present in @servers, because
+ # the previous check was not under a lock.
+ # Since we are under the update lock here, we cannot call servers_list.
+ return if @servers.map(&:address).include?(address)
+
+ @servers.push(server)
+ end
if add_options.nil? || add_options[:monitor] != false
server.start_monitoring
end
server
end
@@ -763,12 +817,20 @@
# public API.
#
# @since 2.0.0
def remove(host, disconnect: true)
address = Address.new(host)
- removed_servers = @servers.select { |s| s.address == address }
- @update_lock.synchronize { @servers = @servers - removed_servers }
+ removed_servers = []
+ @update_lock.synchronize do
+ @servers.delete_if do |server|
+ (server.address == address).tap do |delete|
+ if delete
+ removed_servers << server
+ end
+ end
+ end
+ end
if disconnect != false
removed_servers.each do |server|
disconnect_server_if_connected(server)
end
end
@@ -779,32 +841,40 @@
end
end
# @api private
def update_topology(new_topology)
- old_topology = topology
- @topology = new_topology
+ old_topology = nil
+ @update_lock.synchronize do
+ old_topology = topology
+ @topology = new_topology
+ end
# If new topology has data bearing servers, we know for sure whether
# sessions are supported - update our cached value.
# If new topology has no data bearing servers, leave the old value
# as it is and sessions_supported? method will perform server selection
# to try to determine session support accurately, falling back to the
# last known value.
if topology.data_bearing_servers?
- @sessions_supported = !!topology.logical_session_timeout
+ sessions_supported = !!topology.logical_session_timeout
+ @update_lock.synchronize do
+ @sessions_supported = sessions_supported
+ end
end
publish_sdam_event(
Monitoring::TOPOLOGY_CHANGED,
Monitoring::Event::TopologyChanged.new(old_topology, topology)
)
end
# @api private
def servers_list
- @update_lock.synchronize { @servers.dup }
+ @update_lock.synchronize do
+ @servers.dup
+ end
end
# @api private
def disconnect_server_if_connected(server)
if server.connected?
@@ -840,10 +910,12 @@
ServerSelector.get(mode: :primary_preferred).select_server(self)
!!topology.logical_session_timeout
rescue Error::NoServerAvailable
# We haven't been able to contact any servers - use last known
# value for esssion support.
- @sessions_supported || false
+ @update_lock.synchronize do
+ @sessions_supported || false
+ end
end
end
private