# Copyright (C) 2014-2015 MongoDB, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Mongo class Collection class View # Provides behaviour around an aggregation pipeline on a collection view. # # @since 2.0.0 class Aggregation extend Forwardable include Enumerable include Immutable include Iterable include Explainable include Loggable include Retryable # @return [ View ] view The collection view. attr_reader :view # @return [ Array ] pipeline The aggregation pipeline. attr_reader :pipeline # Delegate necessary operations to the view. def_delegators :view, :collection, :read, :cluster # Delegate necessary operations to the collection. def_delegators :collection, :database # Options mapping for an aggregation. # # @since 2.1.0 OPTIONS_MAP = { :allow_disk_use => :allowDiskUse, :max_time_ms => :maxTimeMS, :explain => :explain }.freeze # The reroute message. # # @since 2.1.0 REROUTE = 'Rerouting the Aggregation operation to the primary server.'.freeze # Set to true if disk usage is allowed during the aggregation. # # @example Set disk usage flag. # aggregation.allow_disk_use(true) # # @param [ true, false ] value The flag value. # # @return [ true, false, Aggregation ] The aggregation if a value was # set or the value if used as a getter. # # @since 2.0.0 def allow_disk_use(value = nil) configure(__method__, value) end # Initialize the aggregation for the provided collection view, pipeline # and options. # # @example Create the new aggregation view. # Aggregation.view.new(view, pipeline) # # @param [ Collection::View ] view The collection view. # @param [ Array ] pipeline The pipeline of operations. # @param [ Hash ] options The aggregation options. # # @since 2.0.0 def initialize(view, pipeline, options = {}) @view = view @pipeline = pipeline.dup @options = options.dup end # Get the explain plan for the aggregation. # # @example Get the explain plan for the aggregation. # aggregation.explain # # @return [ Hash ] The explain plan. # # @since 2.0.0 def explain self.class.new(view, pipeline, options.merge(explain_options)).first end private def aggregate_spec { :db_name => database.name, :read => read, :selector => { :aggregate => collection.name, :pipeline => pipeline, :cursor => cursor, }.merge!(agg_options) } end def agg_options @agg_options ||= options.each.reduce({}) do |opts, (key, value)| OPTIONS_MAP[key] ? opts.merge!(OPTIONS_MAP[key] => value) : opts end end def cursor if options[:use_cursor] == true || options[:use_cursor].nil? batch_size_doc end end def batch_size_doc (value = options[:batch_size] || view.batch_size) ? { :batchSize => value } : {} end def explain_options { :explain => true } end def new(options) Aggregation.new(view, pipeline, options) end def initial_query_op Operation::Aggregate.new(aggregate_spec) end def valid_server?(server) server.standalone? || server.mongos? || server.primary? || secondary_ok? end def secondary_ok? pipeline.none? { |op| op.key?('$out') || op.key?(:$out) } end def send_initial_query(server) unless valid_server?(server) log_warn(REROUTE) server = cluster.next_primary end initial_query_op.execute(server.context) end end end end end