lib/mongo/collection/view/aggregation.rb in mongo-2.16.4 vs lib/mongo/collection/view/aggregation.rb in mongo-2.17.0

- old
+ new

@@ -71,10 +71,29 @@ # # @param [ Collection::View ] view The collection view. # @param [ Array<Hash> ] pipeline The pipeline of operations. # @param [ Hash ] options The aggregation options. # + # @option options [ true, false ] :allow_disk_use Set to true if disk + # usage is allowed during the aggregation. + # @option options [ Integer ] :batch_size The number of documents to return + # per batch. + # @option options [ true, false ] :bypass_document_validation Whether or + # not to skip document level validation. + # @option options [ Hash ] :collation The collation to use. + # @option options [ String ] :comment Associate a comment with the aggregation. + # @option options [ String ] :hint The index to use for the aggregation. + # @option options [ Hash ] :let Mapping of variables to use in the pipeline. + # See the server documentation for details. + # @option options [ Integer ] :max_time_ms The maximum amount of time in + # milliseconds to allow the aggregation to run. + # @option options [ true, false ] :use_cursor Indicates whether the command + # will request that the server provide results using a cursor. Note that + # as of server version 3.6, aggregations always provide results using a + # cursor and this option is therefore not valid. + # @option options [ Session ] :session The session to use. + # # @since 2.0.0 def initialize(view, pipeline, options = {}) @view = view @pipeline = pipeline.dup @options = BSON::Document.new(options).freeze @@ -106,40 +125,66 @@ def server_selector @view.send(:server_selector) end - def aggregate_spec(session) - Builder::Aggregation.new(pipeline, view, options.merge(session: session)).specification + def aggregate_spec(server, session, read_preference) + Builder::Aggregation.new( + pipeline, + view, + options.merge(session: session, read_preference: read_preference) + ).specification end def new(options) Aggregation.new(view, pipeline, options) end - def initial_query_op(session) - Operation::Aggregate.new(aggregate_spec(session)) + def initial_query_op(server, session, read_preference) + Operation::Aggregate.new(aggregate_spec(server, session, read_preference)) end - def valid_server?(server) - if secondary_ok? - true + # Return effective read preference for the operation. + # + # If the pipeline contains $merge or $out, and read preference specified + # by user is secondary or secondary_preferred, and selected server is below + # 5.0, than this method returns primary read preference, because the + # aggregation will be routed to primary. Otherwise return the original + # read preference. + # + # See https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#read-preferences-and-server-selection + # + # @param [ Server ] server The server on which the operation + # should be executed. + # @return [ Hash | nil ] read preference hash that should be sent with + # this command. + def effective_read_preference(server) + return unless view.read_preference + return view.read_preference unless write? + return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode]) + + primary_read_preference = {mode: :primary} + if server.primary? + log_warn("Routing the Aggregation operation to the primary server") + primary_read_preference + elsif server.mongos? && !server.features.merge_out_on_secondary_enabled? + log_warn("Routing the Aggregation operation to the primary server") + primary_read_preference else - description = server.description - description.standalone? || description.mongos? || description.primary? || description.load_balancer? + view.read_preference end - end - def secondary_ok? - !write? end def send_initial_query(server, session) - unless valid_server?(server) - log_warn("Rerouting the Aggregation operation to the primary server - #{server.summary} is not suitable") - server = cluster.next_primary(nil, session) - end - initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session)) + initial_query_op( + server, + session, + effective_read_preference(server) + ).execute( + server, + context: Operation::Context.new(client: client, session: session) + ) end # Skip, sort, limit, projection are specified as pipeline stages # rather than as options. def cache_options