lib/mongo/collection/view/aggregation.rb in mongo-2.20.1 vs lib/mongo/collection/view/aggregation.rb in mongo-2.21.0

- old
+ new

@@ -13,58 +13,25 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/collection/view/aggregation/behavior' + module Mongo class Collection class View # Provides behavior around an aggregation pipeline on a collection view. # # @since 2.0.0 class Aggregation - extend Forwardable - include Enumerable - include Immutable - include Iterable - include Explainable - include Loggable - include Retryable + include Behavior - # @return [ View ] view The collection view. - attr_reader :view # @return [ Array<Hash> ] pipeline The aggregation pipeline. attr_reader :pipeline - # Delegate necessary operations to the view. - def_delegators :view, :collection, :read, :cluster - - # Delegate necessary operations to the collection. - def_delegators :collection, :database, :client - - # The reroute message. - # - # @since 2.1.0 - # @deprecated - REROUTE = 'Rerouting the Aggregation operation to the primary server.'.freeze - - # Set to true if disk usage is allowed during the aggregation. - # - # @example Set disk usage flag. - # aggregation.allow_disk_use(true) - # - # @param [ true, false ] value The flag value. - # - # @return [ true, false, Aggregation ] The aggregation if a value was - # set or the value if used as a getter. - # - # @since 2.0.0 - def allow_disk_use(value = nil) - configure(:allow_disk_use, value) - end - # Initialize the aggregation for the provided collection view, pipeline # and options. # # @example Create the new aggregation view. # Aggregation.view.new(view, pipeline) @@ -84,63 +51,33 @@ # comment to attach to this command. # @option options [ String ] :hint The index to use for the aggregation. # @option options [ Hash ] :let Mapping of variables to use in the pipeline. # See the server documentation for details. # @option options [ Integer ] :max_time_ms The maximum amount of time in - # milliseconds to allow the aggregation to run. - # @option options [ true, false ] :use_cursor Indicates whether the command - # will request that the server provide results using a cursor. Note that - # as of server version 3.6, aggregations always provide results using a - # cursor and this option is therefore not valid. + # milliseconds to allow the aggregation to run. This option is deprecated, use + # :timeout_ms instead. # @option options [ Session ] :session The session to use. + # @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret + # :timeout_ms (whether it applies to the lifetime of the cursor, or per + # iteration). + # @option options [ Integer ] :timeout_ms The operation timeout in milliseconds. + # Must be a non-negative integer. An explicit value of 0 means infinite. + # The default value is unset which means the value is inherited from + # the collection or the database or the client. # # @since 2.0.0 def initialize(view, pipeline, options = {}) - @view = view - @pipeline = pipeline.dup - unless Mongo.broken_view_aggregate || view.filter.empty? - @pipeline.unshift(:$match => view.filter) + perform_setup(view, options) do + @pipeline = pipeline.dup + unless Mongo.broken_view_aggregate || view.filter.empty? + @pipeline.unshift(:$match => view.filter) + end end - @options = BSON::Document.new(options).freeze end - # Get the explain plan for the aggregation. - # - # @example Get the explain plan for the aggregation. - # aggregation.explain - # - # @return [ Hash ] The explain plan. - # - # @since 2.0.0 - def explain - self.class.new(view, pipeline, options.merge(explain: true)).first - end - - # Whether this aggregation will write its result to a database collection. - # - # @return [ Boolean ] Whether the aggregation will write its result - # to a collection. - # - # @api private - def write? - pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) } - end - private - def server_selector - @view.send(:server_selector) - end - - def aggregate_spec(session, read_preference) - Builder::Aggregation.new( - pipeline, - view, - options.merge(session: session, read_preference: read_preference) - ).specification - end - def new(options) Aggregation.new(view, pipeline, options) end def initial_query_op(session, read_preference) @@ -178,34 +115,31 @@ view.read_preference end end - def send_initial_query(server, session) - server.with_connection do |connection| + def send_initial_query(server, context) + if server.load_balancer? + # Connection will be checked in when cursor is drained. + connection = server.pool.check_out(context: context) initial_query_op( - session, + context.session, effective_read_preference(connection) ).execute_with_connection( connection, - context: Operation::Context.new(client: client, session: session) + context: context ) + else + server.with_connection do |connection| + initial_query_op( + context.session, + effective_read_preference(connection) + ).execute_with_connection( + connection, + context: context + ) + end end - end - - # Skip, sort, limit, projection are specified as pipeline stages - # rather than as options. - def cache_options - { - namespace: collection.namespace, - selector: pipeline, - read_concern: view.read_concern, - read_preference: view.read_preference, - collation: options[:collation], - # Aggregations can read documents from more than one collection, - # so they will be cleared on every write operation. - multi_collection: true, - } end end end end end