lib/mongo/cluster.rb in mongo-2.15.1 vs lib/mongo/cluster.rb in mongo-2.16.0.alpha1
- old
+ new
@@ -157,10 +157,14 @@
# @sdam_flow_lock covers just the sdam flow. Note it does not apply
# to @topology replacements which are done under @update_lock.
@sdam_flow_lock = Mutex.new
Session::SessionPool.create(self)
+ if seeds.empty? && load_balanced?
+ raise ArgumentError, 'Load-balanced clusters with no seeds are prohibited'
+ end
+
# The opening topology is always unknown with no servers.
# https://github.com/mongodb/specifications/pull/388
opening_topology = Topology::Unknown.new(options, monitoring, self)
publish_sdam_event(
@@ -169,25 +173,34 @@
)
@seeds = seeds = seeds.uniq
servers = seeds.map do |seed|
# Server opening events must be sent after topology change events.
- # Therefore separate server addition, done here before topoolgy change
+ # Therefore separate server addition, done here before topology change
# event is published, from starting to monitor the server which is
# done later.
add(seed, monitor: false)
end
if seeds.size >= 1
# Recreate the topology to get the current server list into it
- @topology = topology.class.new(topology.options, topology.monitoring, self)
- publish_sdam_event(
- Monitoring::TOPOLOGY_CHANGED,
- Monitoring::Event::TopologyChanged.new(opening_topology, @topology)
- )
+ recreate_topology(topology, opening_topology)
end
+ if load_balanced?
+ # We are required by the specifications to produce certain SDAM events
+ # when in load-balanced topology.
+ # These events don't make a lot of sense from the standpoint of the
+ # driver's SDAM implementation, nor from the standpoint of the
+ # driver's load balancer implementation.
+ # They are just required boilerplate.
+ #
+ # Note that this call must be done above the monitoring_io check
+ # because that short-circuits the rest of the constructor.
+ fabricate_lb_sdam_events_and_set_server_type
+ end
+
if options[:monitoring_io] == false
# Omit periodic executor construction, because without servers
# no commands can be sent to the cluster and there shouldn't ever
# be anything that needs to be cleaned up.
#
@@ -201,67 +214,69 @@
# Update instance variables prior to starting monitoring threads.
@connecting = false
@connected = true
if options[:cleanup] != false
- @cursor_reaper = CursorReaper.new
+ @cursor_reaper = CursorReaper.new(self)
@socket_reaper = SocketReaper.new(self)
@periodic_executor = PeriodicExecutor.new([
@cursor_reaper, @socket_reaper,
], options)
@periodic_executor.run!
end
- # Need to record start time prior to starting monitoring
- start_monotime = Utils.monotonic_time
+ unless load_balanced?
+ # Need to record start time prior to starting monitoring
+ start_monotime = Utils.monotonic_time
- servers.each do |server|
- server.start_monitoring
- end
-
- if options[:scan] != false
- server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT
- # The server selection timeout can be very short especially in
- # tests, when the client waits for a synchronous scan before
- # starting server selection. Limiting the scan to server selection time
- # then aborts the scan before it can process even local servers.
- # Therefore, allow at least 3 seconds for the scan here.
- if server_selection_timeout < 3
- server_selection_timeout = 3
+ servers.each do |server|
+ server.start_monitoring
end
- deadline = start_monotime + server_selection_timeout
- # Wait for the first scan of each server to complete, for
- # backwards compatibility.
- # If any servers are discovered during this SDAM round we are going to
- # wait for these servers to also be queried, and so on, up to the
- # server selection timeout or the 3 second minimum.
- loop do
- # Ensure we do not try to read the servers list while SDAM is running
- servers = @sdam_flow_lock.synchronize do
- servers_list.dup
+
+ if options[:scan] != false
+ server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT
+ # The server selection timeout can be very short especially in
+ # tests, when the client waits for a synchronous scan before
+ # starting server selection. Limiting the scan to server selection time
+ # then aborts the scan before it can process even local servers.
+ # Therefore, allow at least 3 seconds for the scan here.
+ if server_selection_timeout < 3
+ server_selection_timeout = 3
end
- if servers.all? { |server| server.last_scan_monotime && server.last_scan_monotime >= start_monotime }
- break
+ deadline = start_monotime + server_selection_timeout
+ # Wait for the first scan of each server to complete, for
+ # backwards compatibility.
+ # If any servers are discovered during this SDAM round we are going to
+ # wait for these servers to also be queried, and so on, up to the
+ # server selection timeout or the 3 second minimum.
+ loop do
+ # Ensure we do not try to read the servers list while SDAM is running
+ servers = @sdam_flow_lock.synchronize do
+ servers_list.dup
+ end
+ if servers.all? { |server| server.last_scan_monotime && server.last_scan_monotime >= start_monotime }
+ break
+ end
+ if (time_remaining = deadline - Utils.monotonic_time) <= 0
+ break
+ end
+ log_debug("Waiting for up to #{'%.2f' % time_remaining} seconds for servers to be scanned: #{summary}")
+ # Since the semaphore may have been signaled between us checking
+ # the servers list above and the wait call below, we should not
+ # wait for the full remaining time - wait for up to 1 second, then
+ # recheck the state.
+ begin
+ server_selection_semaphore.wait([time_remaining, 1].min)
+ rescue ::Timeout::Error
+ # nothing
+ end
end
- if (time_remaining = deadline - Utils.monotonic_time) <= 0
- break
- end
- log_debug("Waiting for up to #{'%.2f' % time_remaining} seconds for servers to be scanned: #{summary}")
- # Since the semaphore may have been signaled between us checking
- # the servers list above and the wait call below, we should not
- # wait for the full remaining time - wait for up to 1 second, then
- # recheck the state.
- begin
- server_selection_semaphore.wait([time_remaining, 1].min)
- rescue ::Timeout::Error
- # nothing
- end
end
- end
- start_stop_srv_monitor
+ start_stop_srv_monitor
+ end
end
# Create a cluster for the provided client, for use when we don't want the
# client's original cluster instance to be the same.
#
@@ -319,10 +334,18 @@
attr_reader :session_pool
def_delegators :topology, :replica_set?, :replica_set_name, :sharded?,
:single?, :unknown?
+ # Returns whether the cluster is configured to be in the load-balanced
+ # topology.
+ #
+ # @return [ true | false ] Whether the topology is load-balanced.
+ def load_balanced?
+ topology.is_a?(Topology::LoadBalanced)
+ end
+
[:register_cursor, :schedule_kill_cursor, :unregister_cursor].each do |m|
define_method(m) do |*args|
if options[:cleanup] != false
@cursor_reaper.send(m, *args)
end
@@ -597,13 +620,29 @@
# respective server is cleared. Set this option to true to keep the
# existing connection pool (required when handling not master errors
# on 4.2+ servers).
# @option aptions [ true | false ] :awaited Whether the updated description
# was a result of processing an awaited hello.
+ # @option options [ Object ] :service_id Change state for the specified
+ # service id only.
#
# @api private
def run_sdam_flow(previous_desc, updated_desc, options = {})
+ if load_balanced?
+ if updated_desc.config.empty?
+ unless options[:keep_connection_pool]
+ servers_list.each do |server|
+ # TODO should service id be taken out of updated_desc?
+ # We could also assert that
+ # options[:service_id] == updated_desc.service_id
+ server.clear_connection_pool(service_id: options[:service_id])
+ end
+ end
+ end
+ return
+ end
+
@sdam_flow_lock.synchronize do
flow = SdamFlow.new(self, previous_desc, updated_desc,
awaited: options[:awaited])
flow.server_description_changed
@@ -780,12 +819,19 @@
#
# @since 2.0.0
def add(host, add_options=nil)
address = Address.new(host, options)
if !addresses.include?(address)
- server = Server.new(address, self, @monitoring, event_listeners, options.merge(
- monitor: false))
+ opts = options.merge(monitor: false)
+ # Note that in a load-balanced topology, every server must be a
+ # load balancer (load_balancer: true is specified in the options)
+ # but this option isn't set here because we are required by the
+ # specifications to pretent the server started out as an unknown one
+ # and publish server description change event into the load balancer
+ # one. The actual correct description for this server will be set
+ # by the fabricate_lb_sdam_events_and_set_server_type method.
+ server = Server.new(address, self, @monitoring, event_listeners, opts)
@update_lock.synchronize do
# Need to recheck whether server is present in @servers, because
# the previous check was not under a lock.
# Since we are under the update lock here, we cannot call servers_list.
return if @servers.map(&:address).include?(address)
@@ -897,10 +943,14 @@
# least one server. If the client has never connected to any servers,
# the deployment is considered to not support sessions.
#
# @api private
def validate_session_support!
+ if topology.is_a?(Topology::LoadBalanced)
+ return
+ end
+
@state_change_lock.synchronize do
@sdam_flow_lock.synchronize do
if topology.data_bearing_servers?
unless topology.logical_session_timeout
raise_sessions_not_supported
@@ -979,9 +1029,44 @@
else
"The following servers have null logical session timeout: #{offending_servers.map(&:address).map(&:seed).join(', ')}"
end
msg = "The deployment that the driver is connected to does not support sessions: #{reason}"
raise Error::SessionsNotSupported, msg
+ end
+
+ def fabricate_lb_sdam_events_and_set_server_type
+ # Although there is no monitoring connection in load balanced mode,
+ # we must emit the following series of SDAM events.
+ server = @servers.first
+ # We are guaranteed to have the server here.
+ server.publish_opening_event
+ server_desc = server.description
+ # This is where a load balancer actually gets its correct server
+ # description.
+ server.update_description(
+ Server::Description.new(server.address, {},
+ load_balancer: true,
+ force_load_balancer: options[:connect] == :load_balanced,
+ )
+ )
+ publish_sdam_event(
+ Monitoring::SERVER_DESCRIPTION_CHANGED,
+ Monitoring::Event::ServerDescriptionChanged.new(
+ server.address,
+ topology,
+ server_desc,
+ server.description
+ )
+ )
+ recreate_topology(topology, topology)
+ end
+
+ def recreate_topology(new_topology_template, previous_topology)
+ @topology = topology.class.new(new_topology_template.options, new_topology_template.monitoring, self)
+ publish_sdam_event(
+ Monitoring::TOPOLOGY_CHANGED,
+ Monitoring::Event::TopologyChanged.new(previous_topology, @topology)
+ )
end
end
end
require 'mongo/cluster/sdam_flow'