lib/mongo/server_selector/base.rb in mongo-2.13.0.beta1 vs lib/mongo/server_selector/base.rb in mongo-2.13.0.rc1
- old
+ new
@@ -14,12 +14,356 @@
module Mongo
module ServerSelector
- # @api private
class Base
+ # Initialize the server selector.
+ #
+ # @example Initialize the selector.
+ # => [{'dc' => 'nyc'}])
+ #
+ # @example Initialize the preference with no options.
+ #
+ #
+ # @param [ Hash ] options The server preference options.
+ #
+ # @option options [ Integer ] :local_threshold The local threshold boundary for
+ # nearest selection in seconds.
+ # @option options [ Integer ] max_staleness The maximum replication lag,
+ # in seconds, that a secondary can suffer and still be eligible for a read.
+ # A value of -1 is treated identically to nil, which is to not
+ # have a maximum staleness.
+ # @option options [ Hash | nil ] hedge A Hash specifying whether to enable hedged
+ # reads on the server. Hedged reads are not enabled by default. When
+ # specifying this option, it must be in the format: { enabled: true },
+ # where the value of the :enabled key is a boolean value.
+ #
+ # @raise [ Error::InvalidServerPreference ] If tag sets are specified
+ # but not allowed.
+ #
+ # @api private
+ def initialize(options = nil)
+ options = options ? options.dup : {}
+ if options[:max_staleness] == -1
+ options.delete(:max_staleness)
+ end
+ @options = options
+ @tag_sets = options[:tag_sets] || []
+ @max_staleness = options[:max_staleness]
+ @hedge = options[:hedge]
+ validate!
+ end
+ # @return [ Hash ] options The options.
+ attr_reader :options
+ # @return [ Array ] tag_sets The tag sets used to select servers.
+ attr_reader :tag_sets
+ # @return [ Integer ] max_staleness The maximum replication lag, in
+ # seconds, that a secondary can suffer and still be eligible for a read.
+ #
+ # @since 2.4.0
+ attr_reader :max_staleness
+ # @return [ Hash | nil ] hedge The document specifying whether to enable
+ # hedged reads.
+ attr_reader :hedge
+ # Get the timeout for server selection.
+ #
+ # @example Get the server selection timeout, in seconds.
+ # selector.server_selection_timeout
+ #
+ # @return [ Float ] The timeout.
+ #
+ # @since 2.0.0
+ #
+ # @deprecated This setting is now taken from the cluster options when
+ # a server is selected. Will be removed in version 3.0.
+ def server_selection_timeout
+ @server_selection_timeout ||=
+ (options[:server_selection_timeout] || ServerSelector::SERVER_SELECTION_TIMEOUT)
+ end
+ # Get the local threshold boundary for nearest selection in seconds.
+ #
+ # @example Get the local threshold.
+ # selector.local_threshold
+ #
+ # @return [ Float ] The local threshold.
+ #
+ # @since 2.0.0
+ #
+ # @deprecated This setting is now taken from the cluster options when
+ # a server is selected. Will be removed in version 3.0.
+ def local_threshold
+ @local_threshold ||= (options[:local_threshold] || ServerSelector::LOCAL_THRESHOLD)
+ end
+ # @api private
+ def local_threshold_with_cluster(cluster)
+ options[:local_threshold] || cluster.options[:local_threshold] || LOCAL_THRESHOLD
+ end
+ # Inspect the server selector.
+ #
+ # @example Inspect the server selector.
+ # selector.inspect
+ #
+ # @return [ String ] The inspection.
+ #
+ # @since 2.2.0
+ def inspect
+ "#<#{}:0x#{object_id} tag_sets=#{tag_sets.inspect} max_staleness=#{max_staleness.inspect} hedge=#{hedge}>"
+ end
+ # Check equality of two server selectors.
+ #
+ # @example Check server selector equality.
+ # preference == other
+ #
+ # @param [ Object ] other The other preference.
+ #
+ # @return [ true, false ] Whether the objects are equal.
+ #
+ # @since 2.0.0
+ def ==(other)
+ name == && hedge == other.hedge &&
+ max_staleness == other.max_staleness && tag_sets == other.tag_sets
+ end
+ # Select a server from the specified cluster, taking into account
+ # mongos pinning for the specified session.
+ #
+ # If the session is given and has a pinned server, this server is the
+ # only server considered for selection. If the server is of type mongos,
+ # it is returned immediately; otherwise monitoring checks on this
+ # server are initiated to update its status, and if the server becomes
+ # a mongos within the server selection timeout, it is returned.
+ #
+ # If no session is given or the session does not have a pinned server,
+ # normal server selection process is performed among all servers in the
+ # specified cluster matching the preference of this server selector
+ # object. Monitoring checks are initiated on servers in the cluster until
+ # a suitable server is found, up to the server selection timeout.
+ #
+ # If a suitable server is not found within the server selection timeout,
+ # this method raises Error::NoServerAvailable.
+ #
+ # @param [ Mongo::Cluster ] cluster The cluster from which to select
+ # an eligible server.
+ # @param [ true, false ] ping Whether to ping the server before selection.
+ # Deprecated and ignored.
+ # @param [ Session | nil ] session Optional session to take into account
+ # for mongos pinning. Added in version 2.10.0.
+ #
+ # @return [ Mongo::Server ] A server matching the server preference.
+ #
+ # @raise [ Error::NoServerAvailable ] No server was found matching the
+ # specified preference / pinning requirement in the server selection
+ # timeout.
+ # @raise [ Error::LintError ] An unexpected condition was detected, and
+ # lint mode is enabled.
+ #
+ # @since 2.0.0
+ def select_server(cluster, ping = nil, session = nil)
+ server_selection_timeout = cluster.options[:server_selection_timeout] || SERVER_SELECTION_TIMEOUT
+ # Special handling for zero timeout: if we have to select a server,
+ # and the timeout is zero, fail immediately (since server selection
+ # will take some non-zero amount of time in any case).
+ if server_selection_timeout == 0
+ msg = "Failing server selection due to zero timeout. " +
+ " Requested #{name} in cluster: #{cluster.summary}"
+ raise, cluster, msg)
+ end
+ deadline = + server_selection_timeout
+ if session && session.pinned_server
+ if Mongo::Lint.enabled?
+ unless cluster.sharded?
+ raise Error::LintError, "Session has a pinned server in a non-sharded topology: #{topology}"
+ end
+ end
+ if !session.in_transaction?
+ session.unpin
+ end
+ if server = session.pinned_server
+ # Here we assume that a mongos stays in the topology indefinitely.
+ # This will no longer be the case once SRV polling is implemented.
+ unless server.mongos?
+ while (time_remaining = deadline - > 0
+ wait_for_server_selection(cluster, time_remaining)
+ end
+ unless server.mongos?
+ msg = "The session being used is pinned to the server which is not a mongos: #{server.summary} " +
+ "(after #{server_selection_timeout} seconds)"
+ raise, cluster, msg)
+ end
+ end
+ return server
+ end
+ end
+ if cluster.replica_set?
+ validate_max_staleness_value_early!
+ end
+ if cluster.addresses.empty?
+ if Lint.enabled?
+ unless cluster.servers.empty?
+ raise Error::LintError, "Cluster has no addresses but has servers: #{', ')}"
+ end
+ end
+ msg = "Cluster has no addresses, and therefore will never have a server"
+ raise, cluster, msg)
+ end
+=begin Add this check in version 3.0.0
+ unless cluster.connected?
+ msg = 'Cluster is disconnected'
+ raise, cluster, msg)
+ end
+ loop do
+ server = try_select_server(cluster)
+ if server
+ unless cluster.topology.compatible?
+ raise Error::UnsupportedFeatures, cluster.topology.compatibility_error.to_s
+ end
+ if session && session.starting_transaction? && cluster.sharded?
+ end
+ return server
+ end
+ cluster.scan!(false)
+ time_remaining = deadline -
+ if time_remaining > 0
+ wait_for_server_selection(cluster, time_remaining)
+ # If we wait for server selection, perform another round of
+ # attempting to locate a suitable server. Otherwise server selection
+ # can raise NoServerAvailable message when the diagnostics
+ # reports an available server of the requested type.
+ else
+ break
+ end
+ end
+ msg = "No #{name} server is available in cluster: #{cluster.summary} " +
+ "with timeout=#{server_selection_timeout}, " +
+ "LT=#{local_threshold_with_cluster(cluster)}"
+ msg += server_selection_diagnostic_message(cluster)
+ raise, cluster, msg)
+ rescue Error::NoServerAvailable => e
+ if session && session.in_transaction? && !session.committing_transaction?
+ e.add_label('TransientTransactionError')
+ end
+ if session && session.committing_transaction?
+ e.add_label('UnknownTransactionCommitResult')
+ end
+ raise e
+ end
+ # Tries to find a suitable server, returns the server if one is available
+ # or nil if there isn't a suitable server.
+ #
+ # @return [ Server | nil ] A suitable server, if one exists.
+ #
+ # @api private
+ def try_select_server(cluster)
+ servers = suitable_servers(cluster)
+ # This list of servers may be ordered in a specific way
+ # by the selector (e.g. for secondary preferred, the first
+ # server may be a secondary and the second server may be primary)
+ # and we should take the first server here respecting the order
+ server = servers.first
+ if server
+ if Lint.enabled?
+ # It is possible for a server to have a nil average RTT here
+ # because the ARTT comes from description which may be updated
+ # by a background thread while server selection is running.
+ # Currently lint mode is not a public feature, if/when this
+ # changes ( the
+ # requirement for ARTT to be not nil would need to be removed.
+ if server.average_round_trip_time.nil?
+ raise Error::LintError, "Server #{server.address} has nil average rtt"
+ end
+ end
+ end
+ server
+ end
+ # Returns servers of acceptable types from the cluster.
+ #
+ # Does not perform staleness validation, staleness filtering or
+ # latency filtering.
+ #
+ # @param [ Cluster ] cluster The cluster.
+ #
+ # @return [ Array<Server> ] The candidate servers.
+ #
+ # @api private
+ def candidates(cluster)
+ servers = cluster.servers
+ servers.each do |server|
+ validate_max_staleness_support!(server)
+ end
+ if cluster.single?
+ servers
+ elsif cluster.sharded?
+ servers
+ elsif cluster.replica_set?
+ select_in_replica_set(servers)
+ else
+ # Unknown cluster - no servers
+ []
+ end
+ end
+ # Returns servers satisfying the server selector from the cluster.
+ #
+ # @param [ Cluster ] cluster The cluster.
+ #
+ # @return [ Array<Server> ] The suitable servers.
+ #
+ # @api private
+ def suitable_servers(cluster)
+ if cluster.single?
+ candidates(cluster)
+ elsif cluster.sharded?
+ local_threshold = local_threshold_with_cluster(cluster)
+ servers = candidates(cluster)
+ near_servers(servers, local_threshold)
+ elsif cluster.replica_set?
+ validate_max_staleness_value!(cluster)
+ candidates(cluster)
+ else
+ # Unknown cluster - no servers
+ []
+ end
+ end
# Convert this server preference definition into a format appropriate
# for sending to a MongoDB server (i.e., as a command field).
@@ -32,9 +376,243 @@
preference.update(tags: tag_sets) unless tag_sets.empty?
preference.update(maxStalenessSeconds: max_staleness) if max_staleness
preference.update(hedge: hedge) if hedge
+ end
+ # Select the primary from a list of provided candidates.
+ #
+ # @param [ Array ] candidates List of candidate servers to select the
+ # primary from.
+ #
+ # @return [ Array ] The primary.
+ #
+ # @since 2.0.0
+ def primary(candidates)
+ do |server|
+ server.primary?
+ end
+ end
+ # Select the secondaries from a list of provided candidates.
+ #
+ # @param [ Array ] candidates List of candidate servers to select the
+ # secondaries from.
+ #
+ # @return [ Array ] The secondary servers.
+ #
+ # @since 2.0.0
+ def secondaries(candidates)
+ matching_servers =
+ matching_servers = filter_stale_servers(matching_servers, primary(candidates).first)
+ matching_servers = match_tag_sets(matching_servers) unless tag_sets.empty?
+ # Per server selection spec the server selected MUST be a random
+ # one matching staleness and latency requirements.
+ # Selectors always pass the output of #secondaries to #nearest
+ # which shuffles the server list, fulfilling this requirement.
+ matching_servers
+ end
+ # Select the near servers from a list of provided candidates, taking the
+ # local threshold into account.
+ #
+ # @param [ Array ] candidates List of candidate servers to select the
+ # near servers from.
+ # @param [ Integer ] local_threshold Local threshold. This parameter
+ # will be required in driver version 3.0.
+ #
+ # @return [ Array ] The near servers.
+ #
+ # @since 2.0.0
+ def near_servers(candidates = [], local_threshold = nil)
+ return candidates if candidates.empty?
+ # Average RTT on any server may change at any time by the server
+ # monitor's background thread. ARTT may also become nil if the
+ # server is marked unknown. Take a snapshot of ARTTs for the duration
+ # of this method.
+ candidates = do |server|
+ {server: server, artt: server.average_round_trip_time}
+ end.reject do |candidate|
+ candidate[:artt].nil?
+ end
+ return candidates if candidates.empty?
+ nearest_candidate = candidates.min_by do |candidate|
+ candidate[:artt]
+ end
+ # Default for legacy signarure
+ local_threshold ||= self.local_threshold
+ threshold = nearest_candidate[:artt] + local_threshold
+ do |candidate|
+ candidate[:artt] <= threshold
+ do |candidate|
+ candidate[:server]
+ end.shuffle!
+ end
+ # Select the servers matching the defined tag sets.
+ #
+ # @param [ Array ] candidates List of candidate servers from which those
+ # matching the defined tag sets should be selected.
+ #
+ # @return [ Array ] The servers matching the defined tag sets.
+ #
+ # @since 2.0.0
+ def match_tag_sets(candidates)
+ matches = []
+ tag_sets.find do |tag_set|
+ matches = { |server| server.matches_tag_set?(tag_set) }
+ !matches.empty?
+ end
+ matches || []
+ end
+ def filter_stale_servers(candidates, primary = nil)
+ return candidates unless @max_staleness
+ # last_scan is filled out by the Monitor, and can be nil if a server
+ # had its description manually set rather than being normally updated
+ # via the SDAM flow. We don't handle the possibility of a nil
+ # last_scan here.
+ if primary
+ do |server|
+ validate_max_staleness_support!(server)
+ staleness = (server.last_scan - server.last_write_date) -
+ (primary.last_scan - primary.last_write_date) +
+ server.cluster.heartbeat_interval
+ staleness <= @max_staleness
+ end
+ else
+ max_write_date = candidates.collect(&:last_write_date).max
+ do |server|
+ validate_max_staleness_support!(server)
+ staleness = max_write_date - server.last_write_date + server.cluster.heartbeat_interval
+ staleness <= @max_staleness
+ end
+ end
+ end
+ def validate!
+ if !@tag_sets.all? { |set| set.empty? } && !tags_allowed?
+ raise
+ elsif @max_staleness && !max_staleness_allowed?
+ raise
+ end
+ if @hedge
+ unless hedge_allowed?
+ raise
+ end
+ unless @hedge.is_a?(Hash) && @hedge.key?(:enabled) &&
+ [true, false].include?(@hedge[:enabled])
+ raise
+ "`hedge` value (#{hedge}) is invalid - hedge must be a Hash in the " \
+ "format { enabled: true }"
+ )
+ end
+ end
+ end
+ def validate_max_staleness_support!(server)
+ if @max_staleness && !server.features.max_staleness_enabled?
+ raise
+ end
+ end
+ def validate_max_staleness_value_early!
+ if @max_staleness
+ unless @max_staleness >= SMALLEST_MAX_STALENESS_SECONDS
+ msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
+ raise
+ end
+ end
+ end
+ def validate_max_staleness_value!(cluster)
+ if @max_staleness
+ heartbeat_interval = cluster.heartbeat_interval
+ unless @max_staleness >= [
+ min_cluster_staleness = heartbeat_interval + Cluster::IDLE_WRITE_PERIOD_SECONDS,
+ ].max
+ msg = "`max_staleness` value (#{@max_staleness}) is too small - it must be at least " +
+ "`Mongo::ServerSelector::SMALLEST_MAX_STALENESS_SECONDS` (#{ServerSelector::SMALLEST_MAX_STALENESS_SECONDS}) and (the cluster's heartbeat_frequency " +
+ "setting + `Mongo::Cluster::IDLE_WRITE_PERIOD_SECONDS`) (#{min_cluster_staleness})"
+ raise
+ end
+ end
+ end
+ # Waits for server state changes in the specified cluster.
+ #
+ # If the cluster has a server selection semaphore, waits on that
+ # semaphore up to the specified remaining time. Any change in server
+ # state resulting from SDAM will immediately wake up this method and
+ # cause it to return.
+ #
+ # If the cluster des not have a server selection semaphore, waits
+ # the smaller of 0.25 seconds and the specified remaining time.
+ # This functionality is provided for backwards compatibilty only for
+ # applications directly invoking the server selection process.
+ # If lint mode is enabled and the cluster does not have a server
+ # selection semaphore, Error::LintError will be raised.
+ #
+ # @param [ Cluster ] cluster The cluster to wait for.
+ # @param [ Numeric ] time_remaining Maximum time to wait, in seconds.
+ def wait_for_server_selection(cluster, time_remaining)
+ if cluster.server_selection_semaphore
+ # Since the semaphore may have been signaled between us checking
+ # the servers list earlier and the wait call below, we should not
+ # wait for the full remaining time - wait for up to 1 second, then
+ # recheck the state.
+ cluster.server_selection_semaphore.wait([time_remaining, 1].min)
+ else
+ if Lint.enabled?
+ raise Error::LintError, 'Waiting for server selection without having a server selection semaphore'
+ end
+ sleep [time_remaining, 0.25].min
+ end
+ end
+ # Creates a diagnostic message when server selection fails.
+ #
+ # The diagnostic message includes the following information, as applicable:
+ #
+ # - Servers having dead monitor threads
+ # - Cluster is disconnected
+ #
+ # If none of the conditions for diagnostic messages apply, an empty string
+ # is returned.
+ #
+ # @param [ Cluster ] cluster The cluster on which server selection was
+ # performed.
+ #
+ # @return [ String ] The diagnostic message.
+ def server_selection_diagnostic_message(cluster)
+ msg = ''
+ dead_monitors = []
+ cluster.servers_list.each do |server|
+ thread = server.monitor.instance_variable_get('@thread')
+ if thread.nil? || !thread.alive?
+ dead_monitors << server
+ end
+ end
+ if dead_monitors.any?
+ msg += ". The following servers have dead monitor threads: #{', ')}"
+ end
+ unless cluster.connected?
+ msg += ". The cluster is disconnected (client may have been closed)"
+ end
+ msg