lib/mongo/collection/view/aggregation.rb in mongo-2.16.4 vs lib/mongo/collection/view/aggregation.rb in mongo-2.17.0
- old
+ new
@@ -71,10 +71,29 @@
#
# @param [ Collection::View ] view The collection view.
# @param [ Array<Hash> ] pipeline The pipeline of operations.
# @param [ Hash ] options The aggregation options.
#
+ # @option options [ true, false ] :allow_disk_use Set to true if disk
+ # usage is allowed during the aggregation.
+ # @option options [ Integer ] :batch_size The number of documents to return
+ # per batch.
+ # @option options [ true, false ] :bypass_document_validation Whether or
+ # not to skip document level validation.
+ # @option options [ Hash ] :collation The collation to use.
+ # @option options [ String ] :comment Associate a comment with the aggregation.
+ # @option options [ String ] :hint The index to use for the aggregation.
+ # @option options [ Hash ] :let Mapping of variables to use in the pipeline.
+ # See the server documentation for details.
+ # @option options [ Integer ] :max_time_ms The maximum amount of time in
+ # milliseconds to allow the aggregation to run.
+ # @option options [ true, false ] :use_cursor Indicates whether the command
+ # will request that the server provide results using a cursor. Note that
+ # as of server version 3.6, aggregations always provide results using a
+ # cursor and this option is therefore not valid.
+ # @option options [ Session ] :session The session to use.
+ #
# @since 2.0.0
def initialize(view, pipeline, options = {})
@view = view
@pipeline = pipeline.dup
@options = BSON::Document.new(options).freeze
@@ -106,40 +125,66 @@
def server_selector
@view.send(:server_selector)
end
- def aggregate_spec(session)
- Builder::Aggregation.new(pipeline, view, options.merge(session: session)).specification
+ def aggregate_spec(server, session, read_preference)
+ Builder::Aggregation.new(
+ pipeline,
+ view,
+ options.merge(session: session, read_preference: read_preference)
+ ).specification
end
def new(options)
Aggregation.new(view, pipeline, options)
end
- def initial_query_op(session)
- Operation::Aggregate.new(aggregate_spec(session))
+ def initial_query_op(server, session, read_preference)
+ Operation::Aggregate.new(aggregate_spec(server, session, read_preference))
end
- def valid_server?(server)
- if secondary_ok?
- true
+ # Return effective read preference for the operation.
+ #
+ # If the pipeline contains $merge or $out, and read preference specified
+ # by user is secondary or secondary_preferred, and selected server is below
+ # 5.0, than this method returns primary read preference, because the
+ # aggregation will be routed to primary. Otherwise return the original
+ # read preference.
+ #
+ # See https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#read-preferences-and-server-selection
+ #
+ # @param [ Server ] server The server on which the operation
+ # should be executed.
+ # @return [ Hash | nil ] read preference hash that should be sent with
+ # this command.
+ def effective_read_preference(server)
+ return unless view.read_preference
+ return view.read_preference unless write?
+ return view.read_preference unless [:secondary, :secondary_preferred].include?(view.read_preference[:mode])
+
+ primary_read_preference = {mode: :primary}
+ if server.primary?
+ log_warn("Routing the Aggregation operation to the primary server")
+ primary_read_preference
+ elsif server.mongos? && !server.features.merge_out_on_secondary_enabled?
+ log_warn("Routing the Aggregation operation to the primary server")
+ primary_read_preference
else
- description = server.description
- description.standalone? || description.mongos? || description.primary? || description.load_balancer?
+ view.read_preference
end
- end
- def secondary_ok?
- !write?
end
def send_initial_query(server, session)
- unless valid_server?(server)
- log_warn("Rerouting the Aggregation operation to the primary server - #{server.summary} is not suitable")
- server = cluster.next_primary(nil, session)
- end
- initial_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
+ initial_query_op(
+ server,
+ session,
+ effective_read_preference(server)
+ ).execute(
+ server,
+ context: Operation::Context.new(client: client, session: session)
+ )
end
# Skip, sort, limit, projection are specified as pipeline stages
# rather than as options.
def cache_options