lib/mongo/cluster.rb in mongo-2.15.1 vs lib/mongo/cluster.rb in mongo-2.16.0.alpha1

- old
+ new

@@ -157,10 +157,14 @@ # @sdam_flow_lock covers just the sdam flow. Note it does not apply # to @topology replacements which are done under @update_lock. @sdam_flow_lock = Mutex.new Session::SessionPool.create(self) + if seeds.empty? && load_balanced? + raise ArgumentError, 'Load-balanced clusters with no seeds are prohibited' + end + # The opening topology is always unknown with no servers. # https://github.com/mongodb/specifications/pull/388 opening_topology = Topology::Unknown.new(options, monitoring, self) publish_sdam_event( @@ -169,25 +173,34 @@ ) @seeds = seeds = seeds.uniq servers = seeds.map do |seed| # Server opening events must be sent after topology change events. - # Therefore separate server addition, done here before topoolgy change + # Therefore separate server addition, done here before topology change # event is published, from starting to monitor the server which is # done later. add(seed, monitor: false) end if seeds.size >= 1 # Recreate the topology to get the current server list into it - @topology = topology.class.new(topology.options, topology.monitoring, self) - publish_sdam_event( - Monitoring::TOPOLOGY_CHANGED, - Monitoring::Event::TopologyChanged.new(opening_topology, @topology) - ) + recreate_topology(topology, opening_topology) end + if load_balanced? + # We are required by the specifications to produce certain SDAM events + # when in load-balanced topology. + # These events don't make a lot of sense from the standpoint of the + # driver's SDAM implementation, nor from the standpoint of the + # driver's load balancer implementation. + # They are just required boilerplate. + # + # Note that this call must be done above the monitoring_io check + # because that short-circuits the rest of the constructor. + fabricate_lb_sdam_events_and_set_server_type + end + if options[:monitoring_io] == false # Omit periodic executor construction, because without servers # no commands can be sent to the cluster and there shouldn't ever # be anything that needs to be cleaned up. # @@ -201,67 +214,69 @@ # Update instance variables prior to starting monitoring threads. @connecting = false @connected = true if options[:cleanup] != false - @cursor_reaper = CursorReaper.new + @cursor_reaper = CursorReaper.new(self) @socket_reaper = SocketReaper.new(self) @periodic_executor = PeriodicExecutor.new([ @cursor_reaper, @socket_reaper, ], options) @periodic_executor.run! end - # Need to record start time prior to starting monitoring - start_monotime = Utils.monotonic_time + unless load_balanced? + # Need to record start time prior to starting monitoring + start_monotime = Utils.monotonic_time - servers.each do |server| - server.start_monitoring - end - - if options[:scan] != false - server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT - # The server selection timeout can be very short especially in - # tests, when the client waits for a synchronous scan before - # starting server selection. Limiting the scan to server selection time - # then aborts the scan before it can process even local servers. - # Therefore, allow at least 3 seconds for the scan here. - if server_selection_timeout < 3 - server_selection_timeout = 3 + servers.each do |server| + server.start_monitoring end - deadline = start_monotime + server_selection_timeout - # Wait for the first scan of each server to complete, for - # backwards compatibility. - # If any servers are discovered during this SDAM round we are going to - # wait for these servers to also be queried, and so on, up to the - # server selection timeout or the 3 second minimum. - loop do - # Ensure we do not try to read the servers list while SDAM is running - servers = @sdam_flow_lock.synchronize do - servers_list.dup + + if options[:scan] != false + server_selection_timeout = options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT + # The server selection timeout can be very short especially in + # tests, when the client waits for a synchronous scan before + # starting server selection. Limiting the scan to server selection time + # then aborts the scan before it can process even local servers. + # Therefore, allow at least 3 seconds for the scan here. + if server_selection_timeout < 3 + server_selection_timeout = 3 end - if servers.all? { |server| server.last_scan_monotime && server.last_scan_monotime >= start_monotime } - break + deadline = start_monotime + server_selection_timeout + # Wait for the first scan of each server to complete, for + # backwards compatibility. + # If any servers are discovered during this SDAM round we are going to + # wait for these servers to also be queried, and so on, up to the + # server selection timeout or the 3 second minimum. + loop do + # Ensure we do not try to read the servers list while SDAM is running + servers = @sdam_flow_lock.synchronize do + servers_list.dup + end + if servers.all? { |server| server.last_scan_monotime && server.last_scan_monotime >= start_monotime } + break + end + if (time_remaining = deadline - Utils.monotonic_time) <= 0 + break + end + log_debug("Waiting for up to #{'%.2f' % time_remaining} seconds for servers to be scanned: #{summary}") + # Since the semaphore may have been signaled between us checking + # the servers list above and the wait call below, we should not + # wait for the full remaining time - wait for up to 1 second, then + # recheck the state. + begin + server_selection_semaphore.wait([time_remaining, 1].min) + rescue ::Timeout::Error + # nothing + end end - if (time_remaining = deadline - Utils.monotonic_time) <= 0 - break - end - log_debug("Waiting for up to #{'%.2f' % time_remaining} seconds for servers to be scanned: #{summary}") - # Since the semaphore may have been signaled between us checking - # the servers list above and the wait call below, we should not - # wait for the full remaining time - wait for up to 1 second, then - # recheck the state. - begin - server_selection_semaphore.wait([time_remaining, 1].min) - rescue ::Timeout::Error - # nothing - end end - end - start_stop_srv_monitor + start_stop_srv_monitor + end end # Create a cluster for the provided client, for use when we don't want the # client's original cluster instance to be the same. # @@ -319,10 +334,18 @@ attr_reader :session_pool def_delegators :topology, :replica_set?, :replica_set_name, :sharded?, :single?, :unknown? + # Returns whether the cluster is configured to be in the load-balanced + # topology. + # + # @return [ true | false ] Whether the topology is load-balanced. + def load_balanced? + topology.is_a?(Topology::LoadBalanced) + end + [:register_cursor, :schedule_kill_cursor, :unregister_cursor].each do |m| define_method(m) do |*args| if options[:cleanup] != false @cursor_reaper.send(m, *args) end @@ -597,13 +620,29 @@ # respective server is cleared. Set this option to true to keep the # existing connection pool (required when handling not master errors # on 4.2+ servers). # @option aptions [ true | false ] :awaited Whether the updated description # was a result of processing an awaited hello. + # @option options [ Object ] :service_id Change state for the specified + # service id only. # # @api private def run_sdam_flow(previous_desc, updated_desc, options = {}) + if load_balanced? + if updated_desc.config.empty? + unless options[:keep_connection_pool] + servers_list.each do |server| + # TODO should service id be taken out of updated_desc? + # We could also assert that + # options[:service_id] == updated_desc.service_id + server.clear_connection_pool(service_id: options[:service_id]) + end + end + end + return + end + @sdam_flow_lock.synchronize do flow = SdamFlow.new(self, previous_desc, updated_desc, awaited: options[:awaited]) flow.server_description_changed @@ -780,12 +819,19 @@ # # @since 2.0.0 def add(host, add_options=nil) address = Address.new(host, options) if !addresses.include?(address) - server = Server.new(address, self, @monitoring, event_listeners, options.merge( - monitor: false)) + opts = options.merge(monitor: false) + # Note that in a load-balanced topology, every server must be a + # load balancer (load_balancer: true is specified in the options) + # but this option isn't set here because we are required by the + # specifications to pretent the server started out as an unknown one + # and publish server description change event into the load balancer + # one. The actual correct description for this server will be set + # by the fabricate_lb_sdam_events_and_set_server_type method. + server = Server.new(address, self, @monitoring, event_listeners, opts) @update_lock.synchronize do # Need to recheck whether server is present in @servers, because # the previous check was not under a lock. # Since we are under the update lock here, we cannot call servers_list. return if @servers.map(&:address).include?(address) @@ -897,10 +943,14 @@ # least one server. If the client has never connected to any servers, # the deployment is considered to not support sessions. # # @api private def validate_session_support! + if topology.is_a?(Topology::LoadBalanced) + return + end + @state_change_lock.synchronize do @sdam_flow_lock.synchronize do if topology.data_bearing_servers? unless topology.logical_session_timeout raise_sessions_not_supported @@ -979,9 +1029,44 @@ else "The following servers have null logical session timeout: #{offending_servers.map(&:address).map(&:seed).join(', ')}" end msg = "The deployment that the driver is connected to does not support sessions: #{reason}" raise Error::SessionsNotSupported, msg + end + + def fabricate_lb_sdam_events_and_set_server_type + # Although there is no monitoring connection in load balanced mode, + # we must emit the following series of SDAM events. + server = @servers.first + # We are guaranteed to have the server here. + server.publish_opening_event + server_desc = server.description + # This is where a load balancer actually gets its correct server + # description. + server.update_description( + Server::Description.new(server.address, {}, + load_balancer: true, + force_load_balancer: options[:connect] == :load_balanced, + ) + ) + publish_sdam_event( + Monitoring::SERVER_DESCRIPTION_CHANGED, + Monitoring::Event::ServerDescriptionChanged.new( + server.address, + topology, + server_desc, + server.description + ) + ) + recreate_topology(topology, topology) + end + + def recreate_topology(new_topology_template, previous_topology) + @topology = topology.class.new(new_topology_template.options, new_topology_template.monitoring, self) + publish_sdam_event( + Monitoring::TOPOLOGY_CHANGED, + Monitoring::Event::TopologyChanged.new(previous_topology, @topology) + ) end end end require 'mongo/cluster/sdam_flow'