lib/mongo/cluster/sdam_flow.rb in mongo-2.11.1 vs lib/mongo/cluster/sdam_flow.rb in mongo-2.11.2
- old
+ new
@@ -28,10 +28,11 @@
def initialize(cluster, previous_desc, updated_desc)
@cluster = cluster
@topology = cluster.topology
@original_desc = @previous_desc = previous_desc
@updated_desc = updated_desc
+ @servers_to_disconnect = []
end
attr_reader :cluster
def_delegators :cluster, :servers_list, :seeds,
@@ -71,10 +72,64 @@
end
false
end
def server_description_changed
+ if updated_desc.me_mismatch? && updated_desc.primary? &&
+ (topology.unknown? || topology.replica_set?)
+ then
+ # When the driver receives a description claiming to be a primary,
+ # we are obligated by spec tests to add and remove hosts in that
+ # description even if it also has a me mismatch. The me mismatch
+ # scenario though presents a number of problems:
+ #
+ # 1. Effectively, the server's address changes, meaning we cannot
+ # update the description of the server whose description change we
+ # are processing (instead servers are added and removed), but we
+ # behave to an extent as if we are updating the description, which
+ # causes a bunch of awkwardness.
+ # 2. The server for which we are processing the response will be
+ # removed from topology, which may cause the current thread to terminate
+ # prior to running the entire sdam flow. To deal with this we separate
+ # the removal event publication from actually removing the server
+ # from topology, which again complicates the flow.
+
+ # Primary-with-me-mismatch response could be the first one we receive
+ # when the topology is still unknown. Change to RS without primary
+ # in this case.
+ if topology.unknown?
+ @topology = Topology::ReplicaSetNoPrimary.new(
+ topology.options.merge(replica_set_name: updated_desc.replica_set_name),
+ topology.monitoring, self)
+ end
+
+ servers = add_servers_from_desc(updated_desc)
+ # Spec tests require us to remove servers based on data in descrptions
+ # with me mismatches. The driver will be more resilient if it only
+ # removed servers from descriptions with matching mes.
+ remove_servers_not_in_desc(updated_desc)
+
+ servers.each do |server|
+ server.start_monitoring
+ end
+
+ # The rest of sdam flow assumes the server being removed is not the one
+ # whose description we are processing, and publishes description update
+ # event. Since we are removing the server whose response we are
+ # processing, do not publish description change event but mark it
+ # published (by assigning to @previous_desc).
+ do_remove(updated_desc.address.to_s)
+ @previous_desc = updated_desc
+
+ # We may have removed the current primary, check if there is a primary.
+ check_if_has_primary
+ # Publish topology change event.
+ commit_changes
+ disconnect_servers
+ return
+ end
+
unless update_server_descriptions
# All of the transitions require that server whose updated_desc we are
# processing is still in the cluster (i.e., was not removed as a result
# of processing another response, potentially concurrently).
# If update_server_descriptions returned false we have no servers
@@ -140,10 +195,11 @@
else
raise ArgumentError, "Unknown topology #{topology.class}"
end
commit_changes
+ disconnect_servers
end
# Transitions from unknown to single topology type, when a standalone
# server is discovered.
def update_unknown_with_standalone
@@ -335,13 +391,18 @@
updated_desc_address_strs = %w(hosts passives arbiters).map do |m|
updated_desc.send(m)
end.flatten
servers_list.each do |server|
unless updated_desc_address_strs.include?(address_str = server.address.to_s)
+ updated_host = updated_desc.address.to_s
+ if updated_desc.me && updated_desc.me != updated_host
+ updated_host += " (self-identified as #{updated_desc.me})"
+ end
log_warn(
"Removing server #{address_str} because it is not in hosts reported by primary " +
- "#{updated_desc.address}"
+ "#{updated_host}. Reported hosts are: " +
+ updated_desc.hosts.join(', ')
)
do_remove(address_str)
end
end
end
@@ -354,11 +415,25 @@
end
# Removes specified server from topology and warns if the topology ends
# up with an empty server list as a result
def do_remove(address_str)
- cluster.remove(address_str)
+ servers = cluster.remove(address_str, disconnect: false)
+ servers.each do |server|
+ # We need to publish server closed event here, but we cannot close
+ # the server because it could be the server owning the monitor in
+ # whose thread this flow is presently executing, in which case closing
+ # the server can terminate the thread and leave SDAM processing
+ # incomplete. Thus we have to remove the server from the cluster,
+ # publish the event, but do not call disconnect on the server until
+ # the very end when all processing has completed.
+ publish_sdam_event(
+ Mongo::Monitoring::SERVER_CLOSED,
+ Mongo::Monitoring::Event::ServerClosed.new(server.address, cluster.topology)
+ )
+ end
+ @servers_to_disconnect += servers
if servers_list.empty?
log_warn(
"Topology now has no servers - this is likely a misconfiguration of the cluster and/or the application"
)
end
@@ -449,10 +524,19 @@
@topology = topology.class.new(topology.options, topology.monitoring, cluster)
# This sends the SDAM event
cluster.update_topology(topology)
end
+ def disconnect_servers
+ while server = @servers_to_disconnect.shift
+ if server.connected?
+ # Do not publish server closed event, as this was already done
+ server.disconnect!
+ end
+ end
+ end
+
# If the server being processed is identified as data bearing, creates the
# server's connection pool so it can start populating
def start_pool_if_data_bearing
return if !updated_desc.data_bearing?
@@ -466,10 +550,10 @@
# Checks if the cluster has a primary, and if not, transitions the topology
# to ReplicaSetNoPrimary. Topology must be ReplicaSetWithPrimary when
# invoking this method.
def check_if_has_primary
unless topology.replica_set?
- raise ArgumentError, 'check_if_has_primary should only be called when topology is replica set'
+ raise ArgumentError, "check_if_has_primary should only be called when topology is replica set, but it is #{topology.class.name.sub(/.*::/, '')}"
end
primary = servers_list.detect do |server|
# A primary with the wrong set name is not a primary
server.primary? && server.description.replica_set_name == topology.replica_set_name