lib/mongo/cluster/sdam_flow.rb in mongo-2.11.1 vs lib/mongo/cluster/sdam_flow.rb in mongo-2.11.2

- old
+ new

@@ -28,10 +28,11 @@ def initialize(cluster, previous_desc, updated_desc) @cluster = cluster @topology = cluster.topology @original_desc = @previous_desc = previous_desc @updated_desc = updated_desc + @servers_to_disconnect = [] end attr_reader :cluster def_delegators :cluster, :servers_list, :seeds, @@ -71,10 +72,64 @@ end false end def server_description_changed + if updated_desc.me_mismatch? && updated_desc.primary? && + (topology.unknown? || topology.replica_set?) + then + # When the driver receives a description claiming to be a primary, + # we are obligated by spec tests to add and remove hosts in that + # description even if it also has a me mismatch. The me mismatch + # scenario though presents a number of problems: + # + # 1. Effectively, the server's address changes, meaning we cannot + # update the description of the server whose description change we + # are processing (instead servers are added and removed), but we + # behave to an extent as if we are updating the description, which + # causes a bunch of awkwardness. + # 2. The server for which we are processing the response will be + # removed from topology, which may cause the current thread to terminate + # prior to running the entire sdam flow. To deal with this we separate + # the removal event publication from actually removing the server + # from topology, which again complicates the flow. + + # Primary-with-me-mismatch response could be the first one we receive + # when the topology is still unknown. Change to RS without primary + # in this case. + if topology.unknown? + @topology = Topology::ReplicaSetNoPrimary.new( + topology.options.merge(replica_set_name: updated_desc.replica_set_name), + topology.monitoring, self) + end + + servers = add_servers_from_desc(updated_desc) + # Spec tests require us to remove servers based on data in descrptions + # with me mismatches. The driver will be more resilient if it only + # removed servers from descriptions with matching mes. + remove_servers_not_in_desc(updated_desc) + + servers.each do |server| + server.start_monitoring + end + + # The rest of sdam flow assumes the server being removed is not the one + # whose description we are processing, and publishes description update + # event. Since we are removing the server whose response we are + # processing, do not publish description change event but mark it + # published (by assigning to @previous_desc). + do_remove(updated_desc.address.to_s) + @previous_desc = updated_desc + + # We may have removed the current primary, check if there is a primary. + check_if_has_primary + # Publish topology change event. + commit_changes + disconnect_servers + return + end + unless update_server_descriptions # All of the transitions require that server whose updated_desc we are # processing is still in the cluster (i.e., was not removed as a result # of processing another response, potentially concurrently). # If update_server_descriptions returned false we have no servers @@ -140,10 +195,11 @@ else raise ArgumentError, "Unknown topology #{topology.class}" end commit_changes + disconnect_servers end # Transitions from unknown to single topology type, when a standalone # server is discovered. def update_unknown_with_standalone @@ -335,13 +391,18 @@ updated_desc_address_strs = %w(hosts passives arbiters).map do |m| updated_desc.send(m) end.flatten servers_list.each do |server| unless updated_desc_address_strs.include?(address_str = server.address.to_s) + updated_host = updated_desc.address.to_s + if updated_desc.me && updated_desc.me != updated_host + updated_host += " (self-identified as #{updated_desc.me})" + end log_warn( "Removing server #{address_str} because it is not in hosts reported by primary " + - "#{updated_desc.address}" + "#{updated_host}. Reported hosts are: " + + updated_desc.hosts.join(', ') ) do_remove(address_str) end end end @@ -354,11 +415,25 @@ end # Removes specified server from topology and warns if the topology ends # up with an empty server list as a result def do_remove(address_str) - cluster.remove(address_str) + servers = cluster.remove(address_str, disconnect: false) + servers.each do |server| + # We need to publish server closed event here, but we cannot close + # the server because it could be the server owning the monitor in + # whose thread this flow is presently executing, in which case closing + # the server can terminate the thread and leave SDAM processing + # incomplete. Thus we have to remove the server from the cluster, + # publish the event, but do not call disconnect on the server until + # the very end when all processing has completed. + publish_sdam_event( + Mongo::Monitoring::SERVER_CLOSED, + Mongo::Monitoring::Event::ServerClosed.new(server.address, cluster.topology) + ) + end + @servers_to_disconnect += servers if servers_list.empty? log_warn( "Topology now has no servers - this is likely a misconfiguration of the cluster and/or the application" ) end @@ -449,10 +524,19 @@ @topology = topology.class.new(topology.options, topology.monitoring, cluster) # This sends the SDAM event cluster.update_topology(topology) end + def disconnect_servers + while server = @servers_to_disconnect.shift + if server.connected? + # Do not publish server closed event, as this was already done + server.disconnect! + end + end + end + # If the server being processed is identified as data bearing, creates the # server's connection pool so it can start populating def start_pool_if_data_bearing return if !updated_desc.data_bearing? @@ -466,10 +550,10 @@ # Checks if the cluster has a primary, and if not, transitions the topology # to ReplicaSetNoPrimary. Topology must be ReplicaSetWithPrimary when # invoking this method. def check_if_has_primary unless topology.replica_set? - raise ArgumentError, 'check_if_has_primary should only be called when topology is replica set' + raise ArgumentError, "check_if_has_primary should only be called when topology is replica set, but it is #{topology.class.name.sub(/.*::/, '')}" end primary = servers_list.detect do |server| # A primary with the wrong set name is not a primary server.primary? && server.description.replica_set_name == topology.replica_set_name