lib/mongo/server_selector/base.rb in mongo-2.16.4 vs lib/mongo/server_selector/base.rb in mongo-2.17.0
- old
+ new
@@ -160,21 +160,23 @@
# an eligible server.
# @param [ true, false ] ping Whether to ping the server before selection.
# Deprecated and ignored.
# @param [ Session | nil ] session Optional session to take into account
# for mongos pinning. Added in version 2.10.0.
+ # @param [ true | false ] write_aggregation Whether we need a server that
+ # supports writing aggregations (e.g. with $merge/$out) on secondaries.
#
# @return [ Mongo::Server ] A server matching the server preference.
#
# @raise [ Error::NoServerAvailable ] No server was found matching the
# specified preference / pinning requirement in the server selection
# timeout.
# @raise [ Error::LintError ] An unexpected condition was detected, and
# lint mode is enabled.
#
# @since 2.0.0
- def select_server(cluster, ping = nil, session = nil)
+ def select_server(cluster, ping = nil, session = nil, write_aggregation: false)
if cluster.topology.is_a?(Cluster::Topology::LoadBalanced)
return cluster.servers.first
end
server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT
@@ -241,11 +243,11 @@
raise Error::NoServerAvailable.new(self, cluster, msg)
end
=end
loop do
- server = try_select_server(cluster)
+ server = try_select_server(cluster, write_aggregation: write_aggregation)
if server
unless cluster.topology.compatible?
raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
end
@@ -292,14 +294,34 @@
end
# Tries to find a suitable server, returns the server if one is available
# or nil if there isn't a suitable server.
#
+ # @param [ Mongo::Cluster ] cluster The cluster from which to select
+ # an eligible server.
+ # @param [ true | false ] write_aggregation Whether we need a server that
+ # supports writing aggregations (e.g. with $merge/$out) on secondaries.
+ #
# @return [ Server | nil ] A suitable server, if one exists.
#
# @api private
- def try_select_server(cluster)
- servers = suitable_servers(cluster)
+ def try_select_server(cluster, write_aggregation: false)
+ servers = if write_aggregation && cluster.replica_set?
+ # 1. Check if ALL servers in cluster support secondary writes.
+ is_write_supported = cluster.servers.reduce(true) do |res, server|
+ res && server.features.merge_out_on_secondary_enabled?
+ end
+
+ if is_write_supported
+ # 2. If all servers support secondary writes, we respect read preference.
+ suitable_servers(cluster)
+ else
+ # 3. Otherwise we fallback to primary for replica set.
+ [cluster.servers.detect(&:primary?)]
+ end
+ else
+ suitable_servers(cluster)
+ end
# This list of servers may be ordered in a specific way
# by the selector (e.g. for secondary preferred, the first
# server may be a secondary and the second server may be primary)
# and we should take the first server here respecting the order