lib/mongo/server_selector/base.rb in mongo-2.16.4 vs lib/mongo/server_selector/base.rb in mongo-2.17.0

- old
+ new

@@ -160,21 +160,23 @@ # an eligible server. # @param [ true, false ] ping Whether to ping the server before selection. # Deprecated and ignored. # @param [ Session | nil ] session Optional session to take into account # for mongos pinning. Added in version 2.10.0. + # @param [ true | false ] write_aggregation Whether we need a server that + # supports writing aggregations (e.g. with $merge/$out) on secondaries. # # @return [ Mongo::Server ] A server matching the server preference. # # @raise [ Error::NoServerAvailable ] No server was found matching the # specified preference / pinning requirement in the server selection # timeout. # @raise [ Error::LintError ] An unexpected condition was detected, and # lint mode is enabled. # # @since 2.0.0 - def select_server(cluster, ping = nil, session = nil) + def select_server(cluster, ping = nil, session = nil, write_aggregation: false) if cluster.topology.is_a?(Cluster::Topology::LoadBalanced) return cluster.servers.first end server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT @@ -241,11 +243,11 @@ raise Error::NoServerAvailable.new(self, cluster, msg) end =end loop do - server = try_select_server(cluster) + server = try_select_server(cluster, write_aggregation: write_aggregation) if server unless cluster.topology.compatible? raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s end @@ -292,14 +294,34 @@ end # Tries to find a suitable server, returns the server if one is available # or nil if there isn't a suitable server. # + # @param [ Mongo::Cluster ] cluster The cluster from which to select + # an eligible server. + # @param [ true | false ] write_aggregation Whether we need a server that + # supports writing aggregations (e.g. with $merge/$out) on secondaries. + # # @return [ Server | nil ] A suitable server, if one exists. # # @api private - def try_select_server(cluster) - servers = suitable_servers(cluster) + def try_select_server(cluster, write_aggregation: false) + servers = if write_aggregation && cluster.replica_set? + # 1. Check if ALL servers in cluster support secondary writes. + is_write_supported = cluster.servers.reduce(true) do |res, server| + res && server.features.merge_out_on_secondary_enabled? + end + + if is_write_supported + # 2. If all servers support secondary writes, we respect read preference. + suitable_servers(cluster) + else + # 3. Otherwise we fallback to primary for replica set. + [cluster.servers.detect(&:primary?)] + end + else + suitable_servers(cluster) + end # This list of servers may be ordered in a specific way # by the selector (e.g. for secondary preferred, the first # server may be a secondary and the second server may be primary) # and we should take the first server here respecting the order