lib/mongo/collection/view/aggregation.rb in mongo-2.20.1 vs lib/mongo/collection/view/aggregation.rb in mongo-2.21.0
- old
+ new
@@ -13,58 +13,25 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'mongo/collection/view/aggregation/behavior'
+
module Mongo
class Collection
class View
# Provides behavior around an aggregation pipeline on a collection view.
#
# @since 2.0.0
class Aggregation
- extend Forwardable
- include Enumerable
- include Immutable
- include Iterable
- include Explainable
- include Loggable
- include Retryable
+ include Behavior
- # @return [ View ] view The collection view.
- attr_reader :view
# @return [ Array<Hash> ] pipeline The aggregation pipeline.
attr_reader :pipeline
- # Delegate necessary operations to the view.
- def_delegators :view, :collection, :read, :cluster
-
- # Delegate necessary operations to the collection.
- def_delegators :collection, :database, :client
-
- # The reroute message.
- #
- # @since 2.1.0
- # @deprecated
- REROUTE = 'Rerouting the Aggregation operation to the primary server.'.freeze
-
- # Set to true if disk usage is allowed during the aggregation.
- #
- # @example Set disk usage flag.
- # aggregation.allow_disk_use(true)
- #
- # @param [ true, false ] value The flag value.
- #
- # @return [ true, false, Aggregation ] The aggregation if a value was
- # set or the value if used as a getter.
- #
- # @since 2.0.0
- def allow_disk_use(value = nil)
- configure(:allow_disk_use, value)
- end
-
# Initialize the aggregation for the provided collection view, pipeline
# and options.
#
# @example Create the new aggregation view.
# Aggregation.view.new(view, pipeline)
@@ -84,63 +51,33 @@
# comment to attach to this command.
# @option options [ String ] :hint The index to use for the aggregation.
# @option options [ Hash ] :let Mapping of variables to use in the pipeline.
# See the server documentation for details.
# @option options [ Integer ] :max_time_ms The maximum amount of time in
- # milliseconds to allow the aggregation to run.
- # @option options [ true, false ] :use_cursor Indicates whether the command
- # will request that the server provide results using a cursor. Note that
- # as of server version 3.6, aggregations always provide results using a
- # cursor and this option is therefore not valid.
+ # milliseconds to allow the aggregation to run. This option is deprecated, use
+ # :timeout_ms instead.
# @option options [ Session ] :session The session to use.
+ # @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
+ # :timeout_ms (whether it applies to the lifetime of the cursor, or per
+ # iteration).
+ # @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
+ # Must be a non-negative integer. An explicit value of 0 means infinite.
+ # The default value is unset which means the value is inherited from
+ # the collection or the database or the client.
#
# @since 2.0.0
def initialize(view, pipeline, options = {})
- @view = view
- @pipeline = pipeline.dup
- unless Mongo.broken_view_aggregate || view.filter.empty?
- @pipeline.unshift(:$match => view.filter)
+ perform_setup(view, options) do
+ @pipeline = pipeline.dup
+ unless Mongo.broken_view_aggregate || view.filter.empty?
+ @pipeline.unshift(:$match => view.filter)
+ end
end
- @options = BSON::Document.new(options).freeze
end
- # Get the explain plan for the aggregation.
- #
- # @example Get the explain plan for the aggregation.
- # aggregation.explain
- #
- # @return [ Hash ] The explain plan.
- #
- # @since 2.0.0
- def explain
- self.class.new(view, pipeline, options.merge(explain: true)).first
- end
-
- # Whether this aggregation will write its result to a database collection.
- #
- # @return [ Boolean ] Whether the aggregation will write its result
- # to a collection.
- #
- # @api private
- def write?
- pipeline.any? { |op| op.key?('$out') || op.key?(:$out) || op.key?('$merge') || op.key?(:$merge) }
- end
-
private
- def server_selector
- @view.send(:server_selector)
- end
-
- def aggregate_spec(session, read_preference)
- Builder::Aggregation.new(
- pipeline,
- view,
- options.merge(session: session, read_preference: read_preference)
- ).specification
- end
-
def new(options)
Aggregation.new(view, pipeline, options)
end
def initial_query_op(session, read_preference)
@@ -178,34 +115,31 @@
view.read_preference
end
end
- def send_initial_query(server, session)
- server.with_connection do |connection|
+ def send_initial_query(server, context)
+ if server.load_balancer?
+ # Connection will be checked in when cursor is drained.
+ connection = server.pool.check_out(context: context)
initial_query_op(
- session,
+ context.session,
effective_read_preference(connection)
).execute_with_connection(
connection,
- context: Operation::Context.new(client: client, session: session)
+ context: context
)
+ else
+ server.with_connection do |connection|
+ initial_query_op(
+ context.session,
+ effective_read_preference(connection)
+ ).execute_with_connection(
+ connection,
+ context: context
+ )
+ end
end
- end
-
- # Skip, sort, limit, projection are specified as pipeline stages
- # rather than as options.
- def cache_options
- {
- namespace: collection.namespace,
- selector: pipeline,
- read_concern: view.read_concern,
- read_preference: view.read_preference,
- collation: options[:collation],
- # Aggregations can read documents from more than one collection,
- # so they will be cleared on every write operation.
- multi_collection: true,
- }
end
end
end
end
end