lib/mongo/cluster.rb in mongo-2.11.1 vs lib/mongo/cluster.rb in mongo-2.11.2
- old
+ new
@@ -160,17 +160,14 @@
# SDAM responses.
@connecting = @connected = false
return
end
- # Need to record start time prior to starting monitoring
- start_time = Time.now
+ # Update instance variables prior to starting monitoring threads.
+ @connecting = false
+ @connected = true
- servers.each do |server|
- server.start_monitoring
- end
-
if options[:cleanup] != false
@cursor_reaper = CursorReaper.new
@socket_reaper = SocketReaper.new(self)
@periodic_executor = PeriodicExecutor.new([
@cursor_reaper, @socket_reaper,
@@ -180,13 +177,17 @@
{}, @periodic_executor, @session_pool))
@periodic_executor.run!
end
- @connecting = false
- @connected = true
+ # Need to record start time prior to starting monitoring
+ start_time = Time.now
+ servers.each do |server|
+ server.start_monitoring
+ end
+
if options[:scan] != false
server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT
# The server selection timeout can be very short especially in
# tests, when the client waits for a synchronous scan before
# starting server selection. Limiting the scan to server selection time
@@ -741,34 +742,53 @@
#
# @example Remove the server from the cluster.
# server.remove('127.0.0.1:27017')
#
# @param [ String ] host The host/port or socket address.
+ # @param [ true | false ] disconnect Whether to disconnect the servers
+ # being removed. For internal driver use only.
#
- # @return [ true|false ] Whether any servers were removed.
+ # @return [ Array<Server> | true | false ] If disconnect is any value other
+ # than false, including nil, returns whether any servers were removed.
+ # If disconnect is false, returns an array of servers that were removed
+ # (and should be disconnected by the caller).
#
- # @since 2.0.0, return value added in 2.7.0
- def remove(host)
+ # @note The return value of this method is not part of the driver's
+ # public API.
+ #
+ # @since 2.0.0
+ def remove(host, disconnect: true)
address = Address.new(host)
removed_servers = @servers.select { |s| s.address == address }
@update_lock.synchronize { @servers = @servers - removed_servers }
- removed_servers.each do |server|
- if server.connected?
- server.disconnect!
- publish_sdam_event(
- Monitoring::SERVER_CLOSED,
- Monitoring::Event::ServerClosed.new(address, topology)
- )
+ if disconnect != false
+ removed_servers.each do |server|
+ disconnect_server_if_connected(server)
end
end
- removed_servers.any?
+ if disconnect != false
+ removed_servers.any?
+ else
+ removed_servers
+ end
end
# @api private
def update_topology(new_topology)
old_topology = topology
@topology = new_topology
+
+ # If new topology has data bearing servers, we know for sure whether
+ # sessions are supported - update our cached value.
+ # If new topology has no data bearing servers, leave the old value
+ # as it is and sessions_supported? method will perform server selection
+ # to try to determine session support accurately, falling back to the
+ # last known value.
+ if topology.data_bearing_servers?
+ @sessions_supported = !!topology.logical_session_timeout
+ end
+
publish_sdam_event(
Monitoring::TOPOLOGY_CHANGED,
Monitoring::Event::TopologyChanged.new(old_topology, topology)
)
end
@@ -776,55 +796,51 @@
# @api private
def servers_list
@update_lock.synchronize { @servers.dup }
end
- private
-
- # If options[:session] is set, validates that session and returns it.
- # If deployment supports sessions, creates a new session and returns it.
- # The session is implicit unless options[:implicit] is given.
- # If deployment does not support session, returns nil.
- #
- # @note This method will return nil if deployment has no data-bearing
- # servers at the time of the call.
- def get_session(client, options = {})
- return options[:session].validate!(self) if options[:session]
- if sessions_supported?
- Session.new(@session_pool.checkout, client, { implicit: true }.merge(options))
+ # @api private
+ def disconnect_server_if_connected(server)
+ if server.connected?
+ server.disconnect!
+ publish_sdam_event(
+ Monitoring::SERVER_CLOSED,
+ Monitoring::Event::ServerClosed.new(server.address, topology)
+ )
end
end
- def with_session(client, options = {})
- session = get_session(client, options)
- yield(session)
- ensure
- session.end_session if (session && session.implicit?)
- end
-
- # Returns whether the deployment (as this term is defined in the sessions
- # spec) supports sessions.
+ # Returns whether the deployment that the driver is connected to supports
+ # sessions.
#
- # @note If the cluster has no data bearing servers, for example because
- # the deployment is in the middle of a failover, this method returns
- # false.
+ # Session support may change over time, for example due to servers in the
+ # deployment being upgraded or downgraded. This method returns the
+ # current information if the client is connected to at least one data
+ # bearing server. If the client is currently not connected to any data
+ # bearing servers, this method returns the last known value for whether
+ # the deployment supports sessions.
#
- # @note This method returns as soon as the driver connects to any single
- # server in the deployment. Whether deployment overall supports sessions
- # can change depending on how many servers have been contacted, if
- # the servers are configured differently.
+ # @return [ true | false ] Whether deployment supports sessions.
+ # @api private
def sessions_supported?
if topology.data_bearing_servers?
return !!topology.logical_session_timeout
end
+ # No data bearing servers known - perform server selection to try to
+ # get a response from at least one of them, to return an accurate
+ # assessment of whether sessions are currently supported.
begin
ServerSelector.get(mode: :primary_preferred).select_server(self)
!!topology.logical_session_timeout
rescue Error::NoServerAvailable
- false
+ # We haven't been able to contact any servers - use last known
+ # value for esssion support.
+ @sessions_supported || false
end
end
+
+ private
# @api private
def start_stop_srv_monitor
# SRV URI is either always given or not for a given cluster, if one
# wasn't given we shouldn't ever have an SRV monitor to manage.