# frozen_string_literal: true # rubocop:todo all # Copyright (C) 2018-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. class Mongo::Cluster # Handles SDAM flow for a server description changed event. # # Updates server descriptions, topology descriptions and publishes # SDAM events. # # SdamFlow is meant to be instantiated once for every server description # changed event that needs to be processed. # # @api private class SdamFlow extend Forwardable def initialize(cluster, previous_desc, updated_desc, awaited: false) @cluster = cluster @topology = cluster.topology @original_desc = @previous_desc = previous_desc @updated_desc = updated_desc @servers_to_disconnect = [] @awaited = !!awaited end attr_reader :cluster def_delegators :cluster, :servers_list, :seeds, :publish_sdam_event, :log_warn # The topology stored in this attribute can change multiple times throughout # a single sdam flow (e.g. unknown -> RS no primary -> RS with primary). # Events for topology change get sent at the end of flow processing, # such that the above example only publishes an unknown -> RS with primary # event to the application. # # @return Mongo::Cluster::Topology The current topology. attr_reader :topology attr_reader :previous_desc attr_reader :updated_desc attr_reader :original_desc def awaited? @awaited end def_delegators :topology, :replica_set_name # Updates descriptions on all servers whose address matches # updated_desc's address. def update_server_descriptions servers_list.each do |server| if server.address == updated_desc.address # SDAM flow must be run when topology version in the new description # is equal to the current topology version, per the example in # https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#what-is-the-purpose-of-topologyversion unless updated_desc.topology_version_gte?(server.description) return false end @server_description_changed = server.description != updated_desc # Always update server description, so that fields that do not # affect description equality comparisons but are part of the # description are updated. server.update_description(updated_desc) server.update_last_scan # If there was no content difference between descriptions, we # still need to run sdam flow, but if the flow produces no change # in topology we will omit sending events. return true end end false end def server_description_changed @previous_server_descriptions = servers_list.map do |server| [server.address.to_s, server.description] end unless update_server_descriptions # All of the transitions require that server whose updated_desc we are # processing is still in the cluster (i.e., was not removed as a result # of processing another response, potentially concurrently). # If update_server_descriptions returned false we have no servers # in the topology for the description we are processing, stop. return end case topology when Topology::LoadBalanced @updated_desc = ::Mongo::Server::Description::LoadBalancer.new( updated_desc.address, ) update_server_descriptions when Topology::Single if topology.replica_set_name if updated_desc.replica_set_name != topology.replica_set_name log_warn( "Server #{updated_desc.address.to_s} has an incorrect replica set name '#{updated_desc.replica_set_name}'; expected '#{topology.replica_set_name}'" ) @updated_desc = ::Mongo::Server::Description.new( updated_desc.address, {}, average_round_trip_time: updated_desc.average_round_trip_time, minimum_round_trip_time: updated_desc.minimum_round_trip_time ) update_server_descriptions end end when Topology::Unknown if updated_desc.standalone? update_unknown_with_standalone elsif updated_desc.mongos? @topology = Topology::Sharded.new(topology.options, topology.monitoring, self) elsif updated_desc.primary? @topology = Topology::ReplicaSetWithPrimary.new( topology.options.merge(replica_set_name: updated_desc.replica_set_name), topology.monitoring, self) update_rs_from_primary elsif updated_desc.secondary? || updated_desc.arbiter? || updated_desc.other? @topology = Topology::ReplicaSetNoPrimary.new( topology.options.merge(replica_set_name: updated_desc.replica_set_name), topology.monitoring, self) update_rs_without_primary end when Topology::Sharded unless updated_desc.unknown? || updated_desc.mongos? log_warn( "Removing server #{updated_desc.address.to_s} because it is of the wrong type (#{updated_desc.server_type.to_s.upcase}) - expected SHARDED" ) remove end when Topology::ReplicaSetWithPrimary if updated_desc.standalone? || updated_desc.mongos? log_warn( "Removing server #{updated_desc.address.to_s} because it is of the wrong type (#{updated_desc.server_type.to_s.upcase}) - expected a replica set member" ) remove check_if_has_primary elsif updated_desc.primary? update_rs_from_primary elsif updated_desc.secondary? || updated_desc.arbiter? || updated_desc.other? update_rs_with_primary_from_member else check_if_has_primary end when Topology::ReplicaSetNoPrimary if updated_desc.standalone? || updated_desc.mongos? log_warn( "Removing server #{updated_desc.address.to_s} because it is of the wrong type (#{updated_desc.server_type.to_s.upcase}) - expected a replica set member" ) remove elsif updated_desc.primary? # Here we change topology type to RS with primary, however # while processing updated_desc we may find that its RS name # does not match our existing RS name. For this reason # is is imperative to NOT pass updated_desc's RS name to # topology constructor here. # During processing we may remove the server whose updated_desc # we are be processing (e.g. the RS name mismatch case again), # in which case topoogy type will go back to RS without primary # in the check_if_has_primary step. @topology = Topology::ReplicaSetWithPrimary.new( # Do not pass updated_desc's RS name here topology.options, topology.monitoring, self) update_rs_from_primary elsif updated_desc.secondary? || updated_desc.arbiter? || updated_desc.other? update_rs_without_primary end else raise ArgumentError, "Unknown topology #{topology.class}" end verify_invariants commit_changes disconnect_servers end # Transitions from unknown to single topology type, when a standalone # server is discovered. def update_unknown_with_standalone if seeds.length == 1 @topology = Topology::Single.new( topology.options, topology.monitoring, self) else log_warn( "Removing server #{updated_desc.address.to_s} because it is a standalone and we have multiple seeds (#{seeds.length})" ) remove end end # Updates topology which must be a ReplicaSetWithPrimary with information # from the primary's server description. # # This method does not change topology type to ReplicaSetWithPrimary - # this needs to have been done prior to calling this method. # # If the primary whose description is being processed is determined to be # stale, this method will change the server description and topology # type to unknown. def update_rs_from_primary if topology.replica_set_name.nil? @topology = Topology::ReplicaSetWithPrimary.new( topology.options.merge(replica_set_name: updated_desc.replica_set_name), topology.monitoring, self) end if topology.replica_set_name != updated_desc.replica_set_name log_warn( "Removing server #{updated_desc.address.to_s} because it has an " + "incorrect replica set name '#{updated_desc.replica_set_name}'; " + "expected '#{topology.replica_set_name}'" ) remove check_if_has_primary return end if stale_primary? @updated_desc = ::Mongo::Server::Description.new( updated_desc.address, {}, average_round_trip_time: updated_desc.average_round_trip_time, minimum_round_trip_time: updated_desc.minimum_round_trip_time ) update_server_descriptions check_if_has_primary return end if updated_desc.max_wire_version >= 17 @topology = Topology::ReplicaSetWithPrimary.new( topology.options.merge( max_election_id: updated_desc.election_id, max_set_version: updated_desc.set_version ), topology.monitoring, self) else max_election_id = topology.new_max_election_id(updated_desc) max_set_version = topology.new_max_set_version(updated_desc) if max_election_id != topology.max_election_id || max_set_version != topology.max_set_version then @topology = Topology::ReplicaSetWithPrimary.new( topology.options.merge( max_election_id: max_election_id, max_set_version: max_set_version ), topology.monitoring, self) end end # At this point we have accepted the updated server description # and the topology (both are primary). Commit these changes so that # their respective SDAM events are published before SDAM events for # server additions/removals that follow publish_description_change_event servers_list.each do |server| if server.address != updated_desc.address if server.primary? server.update_description( ::Mongo::Server::Description.new( server.address, {}, average_round_trip_time: server.description.average_round_trip_time, minimum_round_trip_time: updated_desc.minimum_round_trip_time ) ) end end end servers = add_servers_from_desc(updated_desc) remove_servers_not_in_desc(updated_desc) check_if_has_primary servers.each do |server| server.start_monitoring end end # Updates a ReplicaSetWithPrimary topology from a non-primary member. def update_rs_with_primary_from_member if topology.replica_set_name != updated_desc.replica_set_name log_warn( "Removing server #{updated_desc.address.to_s} because it has an " + "incorrect replica set name (#{updated_desc.replica_set_name}); " + "current set name is #{topology.replica_set_name}" ) remove check_if_has_primary return end if updated_desc.me_mismatch? log_warn( "Removing server #{updated_desc.address.to_s} because it " + "reported itself as #{updated_desc.me}" ) remove check_if_has_primary return end have_primary = false servers_list.each do |server| if server.primary? have_primary = true break end end unless have_primary @topology = Topology::ReplicaSetNoPrimary.new( topology.options, topology.monitoring, self) end end # Updates a ReplicaSetNoPrimary topology from a non-primary member. def update_rs_without_primary if topology.replica_set_name.nil? @topology = Topology::ReplicaSetNoPrimary.new( topology.options.merge(replica_set_name: updated_desc.replica_set_name), topology.monitoring, self) end if topology.replica_set_name != updated_desc.replica_set_name log_warn( "Removing server #{updated_desc.address.to_s} because it has an " + "incorrect replica set name (#{updated_desc.replica_set_name}); " + "current set name is #{topology.replica_set_name}" ) remove return end publish_description_change_event servers = add_servers_from_desc(updated_desc) commit_changes servers.each do |server| server.start_monitoring end if updated_desc.me_mismatch? log_warn( "Removing server #{updated_desc.address.to_s} because it " + "reported itself as #{updated_desc.me}" ) remove return end end # Adds all servers referenced in the given description (which is # supposed to have come from a good primary) which are not # already in the cluster, to the cluster. # # @note Servers are added unmonitored. Monitoring must be started later # separately. # # @return [ Array ] Servers actually added to the cluster. # This is the set of servers on which monitoring should be started. def add_servers_from_desc(updated_desc) added_servers = [] %w(hosts passives arbiters).each do |m| updated_desc.send(m).each do |address_str| if server = cluster.add(address_str, monitor: false) added_servers << server end end end verify_invariants added_servers end # Removes servers from the topology which are not present in the # given server description (which is supposed to have come from a # good primary). def remove_servers_not_in_desc(updated_desc) updated_desc_address_strs = %w(hosts passives arbiters).map do |m| updated_desc.send(m) end.flatten servers_list.each do |server| unless updated_desc_address_strs.include?(address_str = server.address.to_s) updated_host = updated_desc.address.to_s if updated_desc.me && updated_desc.me != updated_host updated_host += " (self-identified as #{updated_desc.me})" end log_warn( "Removing server #{address_str} because it is not in hosts reported by primary " + "#{updated_host}. Reported hosts are: " + updated_desc.hosts.join(', ') ) do_remove(address_str) end end end # Removes the server whose description we are processing from the # topology. def remove publish_description_change_event do_remove(updated_desc.address.to_s) end # Removes specified server from topology and warns if the topology ends # up with an empty server list as a result def do_remove(address_str) servers = cluster.remove(address_str, disconnect: false) servers.each do |server| # Clear the description so that the server is marked as unknown. server.clear_description # We need to publish server closed event here, but we cannot close # the server because it could be the server owning the monitor in # whose thread this flow is presently executing, in which case closing # the server can terminate the thread and leave SDAM processing # incomplete. Thus we have to remove the server from the cluster, # publish the event, but do not call disconnect on the server until # the very end when all processing has completed. publish_sdam_event( Mongo::Monitoring::SERVER_CLOSED, Mongo::Monitoring::Event::ServerClosed.new(server.address, cluster.topology) ) end @servers_to_disconnect += servers if servers_list.empty? log_warn( "Topology now has no servers - this is likely a misconfiguration of the cluster and/or the application" ) end end def publish_description_change_event # This method may be invoked when server description definitely changed # but prior to the topology getting updated. Therefore we check both # server description changes and overall topology changes. When this # method is called at the end of SDAM flow as part of "commit changes" # step, server description change is incorporated into the topology # change. unless @server_description_changed || topology_effectively_changed? return end # updated_desc here may not be the description we received from # the server - in case of a stale primary, the server reported itself # as being a primary but updated_desc here will be unknown. # We used to not notify on Unknown -> Unknown server changes. # Technically these are valid state changes (or at least as valid as # other server description changes when the description has not # changed meaningfully but the events are still published). # The current version of the driver notifies on Unknown -> Unknown # transitions. # Avoid dispatching events when updated description is the same as # previous description. This allows this method to be called multiple # times in the flow when the events should be published, without # worrying about whether there are any unpublished changes. if updated_desc.object_id == previous_desc.object_id return end publish_sdam_event( ::Mongo::Monitoring::SERVER_DESCRIPTION_CHANGED, ::Mongo::Monitoring::Event::ServerDescriptionChanged.new( updated_desc.address, topology, previous_desc, updated_desc, awaited: awaited?, ) ) @previous_desc = updated_desc @need_topology_changed_event = true end # Publishes server description changed events, updates topology on # the cluster and publishes topology changed event, as needed # based on operations performed during SDAM flow processing. def commit_changes # The application-visible sequence of events should be as follows: # # 1. Description change for the server which we are processing; # 2. Topology change, if any; # 3. Description changes for other servers, if any. # # The tricky part here is that the server description changes are # not all processed together. publish_description_change_event start_pool_if_data_bearing topology_changed_event_published = false if !topology.equal?(cluster.topology) || @need_topology_changed_event # We are about to publish topology changed event. # Recreate the topology instance to get its server descriptions # up to date. @topology = topology.class.new(topology.options, topology.monitoring, cluster) # This sends the SDAM event cluster.update_topology(topology) topology_changed_event_published = true @need_topology_changed_event = false end # If a server description changed, topology description change event # must be published with the previous and next topologies being of # the same type, unless we already published topology change event if topology_changed_event_published return end if updated_desc.unknown? && previous_desc.unknown? return end if updated_desc.object_id == previous_desc.object_id return end unless topology_effectively_changed? return end # If we are here, there has been a change in the server descriptions # in our topology, but topology class has not changed. # Publish the topology changed event and recreate the topology to # get the new list of server descriptions into it. @topology = topology.class.new(topology.options, topology.monitoring, cluster) # This sends the SDAM event cluster.update_topology(topology) end def disconnect_servers while server = @servers_to_disconnect.shift if server.connected? # Do not publish server closed event, as this was already done server.disconnect! end end end # If the server being processed is identified as data bearing, creates the # server's connection pool so it can start populating def start_pool_if_data_bearing return if !updated_desc.data_bearing? servers_list.each do |server| if server.address == @updated_desc.address server.pool end end end # Checks if the cluster has a primary, and if not, transitions the topology # to ReplicaSetNoPrimary. Topology must be ReplicaSetWithPrimary when # invoking this method. def check_if_has_primary unless topology.replica_set? raise ArgumentError, "check_if_has_primary should only be called when topology is replica set, but it is #{topology.class.name.sub(/.*::/, '')}" end primary = servers_list.detect do |server| # A primary with the wrong set name is not a primary server.primary? && server.description.replica_set_name == topology.replica_set_name end unless primary @topology = Topology::ReplicaSetNoPrimary.new( topology.options, topology.monitoring, self) end end # Whether updated_desc is for a stale primary. def stale_primary? if updated_desc.max_wire_version >= 17 if updated_desc.election_id.nil? && !topology.max_election_id.nil? return true end if updated_desc.election_id && topology.max_election_id && updated_desc.election_id < topology.max_election_id return true end if updated_desc.election_id == topology.max_election_id if updated_desc.set_version.nil? && !topology.max_set_version.nil? return true end if updated_desc.set_version && topology.max_set_version && updated_desc.set_version < topology.max_set_version return true end end else if updated_desc.election_id && updated_desc.set_version if topology.max_set_version && topology.max_election_id && (updated_desc.set_version < topology.max_set_version || (updated_desc.set_version == topology.max_set_version && updated_desc.election_id < topology.max_election_id)) return true end end end false end # Returns whether the server whose description this flow processed # was not previously unknown, and is now. Used to decide, in particular, # whether to clear the server's connection pool. def became_unknown? updated_desc.unknown? && !original_desc.unknown? end # Returns whether topology meaningfully changed as a result of running # SDAM flow. # # The spec defines topology equality through equality of topology types # and server descriptions in each topology; this definition is not usable # by us because our topology objects do not hold server descriptions and # are instead "live". Thus we have to store the full list of server # descriptions at the beginning of SDAM flow and compare them to the # current ones. def topology_effectively_changed? unless topology.equal?(cluster.topology) return true end server_descriptions = servers_list.map do |server| [server.address.to_s, server.description] end @previous_server_descriptions != server_descriptions end def verify_invariants if Mongo::Lint.enabled? if cluster.topology.single? if cluster.servers_list.length > 1 raise Mongo::Error::LintError, "Trying to create a single topology with multiple servers: #{cluster.servers_list}" end end end end end end