# frozen_string_literal: true # encoding: utf-8 # Copyright (C) 2014-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. module Mongo class Collection class View # Defines read related behavior for collection view. # # @since 2.0.0 module Readable # Execute an aggregation on the collection view. # # @example Aggregate documents. # view.aggregate([ # { "$group" => { "_id" => "$city", "tpop" => { "$sum" => "$pop" }}} # ]) # # @param [ Array ] pipeline The aggregation pipeline. # @param [ Hash ] options The aggregation options. # # @option options [ true, false ] :allow_disk_use Set to true if disk # usage is allowed during the aggregation. # @option options [ Integer ] :batch_size The number of documents to return # per batch. # @option options [ true, false ] :bypass_document_validation Whether or # not to skip document level validation. # @option options [ Hash ] :collation The collation to use. # @option options [ String ] :comment Associate a comment with the aggregation. # @option options [ String ] :hint The index to use for the aggregation. # @option options [ Hash ] :let Mapping of variables to use in the pipeline. # See the server documentation for details. # @option options [ Integer ] :max_time_ms The maximum amount of time in # milliseconds to allow the aggregation to run. # @option options [ true, false ] :use_cursor Indicates whether the command # will request that the server provide results using a cursor. Note that # as of server version 3.6, aggregations always provide results using a # cursor and this option is therefore not valid. # @option options [ Session ] :session The session to use. # # @return [ Aggregation ] The aggregation object. # # @since 2.0.0 def aggregate(pipeline, options = {}) aggregation = Aggregation.new(self, pipeline, options) # Because the $merge and $out pipeline stages write documents to the # collection, it is necessary to clear the cache when they are performed. # # Opt to clear the entire cache rather than one namespace because # the $out and $merge stages do not have to write to the same namespace # on which the aggregation is performed. QueryCache.clear if aggregation.write? aggregation end # Allows the server to write temporary data to disk while executing # a find operation. # # @return [ View ] The new view. def allow_disk_use configure(:allow_disk_use, true) end # Allows the query to get partial results if some shards are down. # # @example Allow partial results. # view.allow_partial_results # # @return [ View ] The new view. # # @since 2.0.0 def allow_partial_results configure(:allow_partial_results, true) end # Tell the query's cursor to stay open and wait for data. # # @example Await data on the cursor. # view.await_data # # @return [ View ] The new view. # # @since 2.0.0 def await_data configure(:await_data, true) end # The number of documents returned in each batch of results from MongoDB. # # @example Set the batch size. # view.batch_size(5) # # @note Specifying 1 or a negative number is analogous to setting a limit. # # @param [ Integer ] batch_size The size of each batch of results. # # @return [ Integer, View ] Either the batch_size value or a # new +View+. # # @since 2.0.0 def batch_size(batch_size = nil) configure(:batch_size, batch_size) end # Associate a comment with the query. # # @example Add a comment. # view.comment('slow query') # # @note Set profilingLevel to 2 and the comment will be logged in the profile # collection along with the query. # # @param [ String ] comment The comment to be associated with the query. # # @return [ String, View ] Either the comment or a # new +View+. # # @since 2.0.0 def comment(comment = nil) configure(:comment, comment) end # Get a count of matching documents in the collection. # # @example Get the number of documents in the collection. # collection_view.count # # @param [ Hash ] opts Options for the operation. # # @option opts :skip [ Integer ] The number of documents to skip. # @option opts :hint [ Hash ] Override default index selection and force # MongoDB to use a specific index for the query. # @option opts :limit [ Integer ] Max number of docs to count. # @option opts :max_time_ms [ Integer ] The maximum amount of time to allow the # command to run. # @option opts [ Hash ] :read The read preference options. # @option opts [ Hash ] :collation The collation to use. # @option opts [ Mongo::Session ] :session The session to use for the operation. # # @return [ Integer ] The document count. # # @since 2.0.0 # # @deprecated Use #count_documents or #estimated_document_count instead. However, note that # the following operators will need to be substituted when switching to #count_documents: # * $where should be replaced with $expr (only works on 3.6+) # * $near should be replaced with $geoWithin with $center # * $nearSphere should be replaced with $geoWithin with $centerSphere def count(opts = {}) cmd = { :count => collection.name, :query => filter } cmd[:skip] = opts[:skip] if opts[:skip] cmd[:hint] = opts[:hint] if opts[:hint] cmd[:limit] = opts[:limit] if opts[:limit] if read_concern cmd[:readConcern] = Options::Mapper.transform_values_to_strings( read_concern) end cmd[:maxTimeMS] = opts[:max_time_ms] if opts[:max_time_ms] Mongo::Lint.validate_underscore_read_preference(opts[:read]) read_pref = opts[:read] || read_preference selector = ServerSelector.get(read_pref || server_selector) with_session(opts) do |session| read_with_retry(session, selector) do |server| Operation::Count.new( selector: cmd, db_name: database.name, options: {:limit => -1}, read: read_pref, session: session, # For some reason collation was historically accepted as a # string key. Note that this isn't documented as valid usage. collation: opts[:collation] || opts['collation'] || collation, ).execute(server, context: Operation::Context.new(client: client, session: session)) end.n.to_i end end # Get a count of matching documents in the collection. # # @example Get the number of documents in the collection. # collection_view.count # # @param [ Hash ] opts Options for the operation. # # @option opts :skip [ Integer ] The number of documents to skip. # @option opts :hint [ Hash ] Override default index selection and force # MongoDB to use a specific index for the query. Requires server version 3.6+. # @option opts :limit [ Integer ] Max number of docs to count. # @option opts :max_time_ms [ Integer ] The maximum amount of time to allow the # command to run. # @option opts [ Hash ] :read The read preference options. # @option opts [ Hash ] :collation The collation to use. # # @return [ Integer ] The document count. # # @since 2.6.0 def count_documents(opts = {}) pipeline = [:'$match' => filter] pipeline << { :'$skip' => opts[:skip] } if opts[:skip] pipeline << { :'$limit' => opts[:limit] } if opts[:limit] pipeline << { :'$group' => { _id: 1, n: { :'$sum' => 1 } } } opts = opts.select { |k, _| [:hint, :max_time_ms, :read, :collation, :session].include?(k) } opts[:collation] ||= collation first = aggregate(pipeline, opts).first return 0 unless first first['n'].to_i end # Gets an estimate of the count of documents in a collection using collection metadata. # # @example Get the number of documents in the collection. # collection_view.estimated_document_count # # @param [ Hash ] opts Options for the operation. # # @option opts :max_time_ms [ Integer ] The maximum amount of time to allow the command to # run. # @option opts [ Hash ] :read The read preference options. # # @return [ Integer ] The document count. # # @since 2.6.0 def estimated_document_count(opts = {}) unless view.filter.empty? raise ArgumentError, "Cannot call estimated_document_count when querying with a filter" end %i[limit skip].each do |opt| if @options.key?(opt) raise ArgumentError, "Cannot call estimated_document_count when querying with #{opt}" end end Mongo::Lint.validate_underscore_read_preference(opts[:read]) read_pref = opts[:read] || read_preference selector = ServerSelector.get(read_pref || server_selector) with_session(opts) do |session| read_with_retry(session, selector) do |server| context = Operation::Context.new(client: client, session: session) if server.description.server_version_gte?('5.0') pipeline = [ {'$collStats' => {'count' => {}}}, {'$group' => {'_id' => 1, 'n' => {'$sum' => '$count'}}}, ] spec = Builder::Aggregation.new( pipeline, self, options.merge(session: session) ).specification result = Operation::Aggregate.new(spec).execute(server, context: context) result.documents.first.fetch('n') else cmd = { count: collection.name } cmd[:maxTimeMS] = opts[:max_time_ms] if opts[:max_time_ms] if read_concern cmd[:readConcern] = Options::Mapper.transform_values_to_strings( read_concern) end result = Operation::Count.new( selector: cmd, db_name: database.name, read: read_pref, session: session, ).execute(server, context: context) result.n.to_i end end end rescue Error::OperationFailure => exc if exc.code == 26 # NamespaceNotFound # This should only happen with the aggregation pipeline path # (server 4.9+). Previous servers should return 0 for nonexistent # collections. 0 else raise end end # Get a list of distinct values for a specific field. # # @example Get the distinct values. # collection_view.distinct('name') # # @param [ String, Symbol ] field_name The name of the field. # @param [ Hash ] opts Options for the distinct command. # # @option opts :max_time_ms [ Integer ] The maximum amount of time to allow the # command to run. # @option opts [ Hash ] :read The read preference options. # @option opts [ Hash ] :collation The collation to use. # # @return [ Array ] The list of distinct values. # # @since 2.0.0 def distinct(field_name, opts = {}) if field_name.nil? raise ArgumentError, 'Field name for distinct operation must be not nil' end cmd = { :distinct => collection.name, :key => field_name.to_s, :query => filter } cmd[:maxTimeMS] = opts[:max_time_ms] if opts[:max_time_ms] if read_concern cmd[:readConcern] = Options::Mapper.transform_values_to_strings( read_concern) end Mongo::Lint.validate_underscore_read_preference(opts[:read]) read_pref = opts[:read] || read_preference selector = ServerSelector.get(read_pref || server_selector) with_session(opts) do |session| read_with_retry(session, selector) do |server| Operation::Distinct.new( selector: cmd, db_name: database.name, options: {:limit => -1}, read: read_pref, session: session, # For some reason collation was historically accepted as a # string key. Note that this isn't documented as valid usage. collation: opts[:collation] || opts['collation'] || collation, ).execute(server, context: Operation::Context.new(client: client, session: session)) end.first['values'] end end # The index that MongoDB will be forced to use for the query. # # @example Set the index hint. # view.hint(name: 1) # # @param [ Hash ] hint The index to use for the query. # # @return [ Hash, View ] Either the hint or a new +View+. # # @since 2.0.0 def hint(hint = nil) configure(:hint, hint) end # The max number of docs to return from the query. # # @example Set the limit. # view.limit(5) # # @param [ Integer ] limit The number of docs to return. # # @return [ Integer, View ] Either the limit or a new +View+. # # @since 2.0.0 def limit(limit = nil) configure(:limit, limit) end # Execute a map/reduce operation on the collection view. # # @example Execute a map/reduce. # view.map_reduce(map, reduce) # # @param [ String ] map The map js function. # @param [ String ] reduce The reduce js function. # @param [ Hash ] options The map/reduce options. # # @return [ MapReduce ] The map reduce wrapper. # # @since 2.0.0 def map_reduce(map, reduce, options = {}) MapReduce.new(self, map, reduce, @options.merge(options)) end # Set the max number of documents to scan. # # @example Set the max scan value. # view.max_scan(1000) # # @param [ Integer ] value The max number to scan. # # @return [ Integer, View ] The value or a new +View+. # # @since 2.0.0 # # @deprecated This option is deprecated as of MongoDB server # version 4.0. def max_scan(value = nil) configure(:max_scan, value) end # Set the maximum value to search. # # @example Set the max value. # view.max_value(_id: 1) # # @param [ Hash ] value The max field and value. # # @return [ Hash, View ] The value or a new +View+. # # @since 2.1.0 def max_value(value = nil) configure(:max_value, value) end # Set the minimum value to search. # # @example Set the min value. # view.min_value(_id: 1) # # @param [ Hash ] value The min field and value. # # @return [ Hash, View ] The value or a new +View+. # # @since 2.1.0 def min_value(value = nil) configure(:min_value, value) end # The server normally times out idle cursors after an inactivity period # (10 minutes) to prevent excess memory use. Set this option to prevent that. # # @example Set the cursor to not timeout. # view.no_cursor_timeout # # @return [ View ] The new view. # # @since 2.0.0 def no_cursor_timeout configure(:no_cursor_timeout, true) end # The fields to include or exclude from each doc in the result set. # # @example Set the fields to include or exclude. # view.projection(name: 1) # # @note A value of 0 excludes a field from the doc. A value of 1 includes it. # Values must all be 0 or all be 1, with the exception of the _id value. # The _id field is included by default. It must be excluded explicitly. # # @param [ Hash ] document The field and 1 or 0, to include or exclude it. # # @return [ Hash, View ] Either the fields or a new +View+. # # @since 2.0.0 def projection(document = nil) validate_doc!(document) if document configure(:projection, document) end # The read preference to use for the query. # # @note If none is specified for the query, the read preference of the # collection will be used. # # @param [ Hash ] value The read preference mode to use for the query. # # @return [ Symbol, View ] Either the read preference or a # new +View+. # # @since 2.0.0 def read(value = nil) return read_preference if value.nil? configure(:read, value) end # Set whether to return only the indexed field or fields. # # @example Set the return key value. # view.return_key(true) # # @param [ true, false ] value The return key value. # # @return [ true, false, View ] The value or a new +View+. # # @since 2.1.0 def return_key(value = nil) configure(:return_key, value) end # Set whether the disk location should be shown for each document. # # @example Set show disk location option. # view.show_disk_loc(true) # # @param [ true, false ] value The value for the field. # # @return [ true, false, View ] Either the value or a new # +View+. # # @since 2.0.0 def show_disk_loc(value = nil) configure(:show_disk_loc, value) end alias :show_record_id :show_disk_loc # The number of docs to skip before returning results. # # @example Set the number to skip. # view.skip(10) # # @param [ Integer ] number Number of docs to skip. # # @return [ Integer, View ] Either the skip value or a # new +View+. # # @since 2.0.0 def skip(number = nil) configure(:skip, number) end # Set the snapshot value for the view. # # @note When set to true, prevents documents from returning more than # once. # # @example Set the snapshot value. # view.snapshot(true) # # @param [ true, false ] value The snapshot value. # # @since 2.0.0 # # @deprecated This option is deprecated as of MongoDB server # version 4.0. def snapshot(value = nil) configure(:snapshot, value) end # The key and direction pairs by which the result set will be sorted. # # @example Set the sort criteria # view.sort(name: -1) # # @param [ Hash ] spec The attributes and directions to sort by. # # @return [ Hash, View ] Either the sort setting or a # new +View+. # # @since 2.0.0 def sort(spec = nil) configure(:sort, spec) end # If called without arguments or with a nil argument, returns # the legacy (OP_QUERY) server modifiers for the current view. # If called with a non-nil argument, which must be a Hash or a # subclass, merges the provided modifiers into the current view. # Both string and symbol keys are allowed in the input hash. # # @example Set the modifiers document. # view.modifiers(:$orderby => Mongo::Index::ASCENDING) # # @param [ Hash ] doc The modifiers document. # # @return [ Hash, View ] Either the modifiers document or a new +View+. # # @since 2.1.0 def modifiers(doc = nil) if doc.nil? Operation::Find::Builder::Modifiers.map_server_modifiers(options) else new(options.merge(Operation::Find::Builder::Modifiers.map_driver_options(BSON::Document.new(doc)))) end end # A cumulative time limit in milliseconds for processing get more operations # on a cursor. # # @example Set the max await time ms value. # view.max_await_time_ms(500) # # @param [ Integer ] max The max time in milliseconds. # # @return [ Integer, View ] Either the max await time ms value or a new +View+. # # @since 2.1.0 def max_await_time_ms(max = nil) configure(:max_await_time_ms, max) end # A cumulative time limit in milliseconds for processing operations on a cursor. # # @example Set the max time ms value. # view.max_time_ms(500) # # @param [ Integer ] max The max time in milliseconds. # # @return [ Integer, View ] Either the max time ms value or a new +View+. # # @since 2.1.0 def max_time_ms(max = nil) configure(:max_time_ms, max) end # The type of cursor to use. Can be :tailable or :tailable_await. # # @example Set the cursor type. # view.cursor_type(:tailable) # # @param [ :tailable, :tailable_await ] type The cursor type. # # @return [ :tailable, :tailable_await, View ] Either the cursor type setting or a new +View+. # # @since 2.3.0 def cursor_type(type = nil) configure(:cursor_type, type) end # @api private def read_concern if options[:session] && options[:session].in_transaction? options[:session].send(:txn_read_concern) || collection.client.read_concern else collection.read_concern end end # @api private def read_preference @read_preference ||= begin # Operation read preference is always respected, and has the # highest priority. If we are in a transaction, we look at # transaction read preference and default to client, ignoring # collection read preference. If we are not in transaction we # look at collection read preference which defaults to client. rp = if options[:read] options[:read] elsif options[:session] && options[:session].in_transaction? options[:session].txn_read_preference || collection.client.read_preference else collection.read_preference end Lint.validate_underscore_read_preference(rp) rp end end private def collation(doc = nil) configure(:collation, doc) end def server_selector @server_selector ||= if options[:session] && options[:session].in_transaction? ServerSelector.get(read_preference || client.server_selector) else ServerSelector.get(read_preference || collection.server_selector) end end def parallel_scan(cursor_count, options = {}) if options[:session] # The session would be overwritten by the one in +options+ later. session = client.send(:get_session, @options) else session = nil end server = server_selector.select_server(cluster, nil, session) spec = { coll_name: collection.name, db_name: database.name, cursor_count: cursor_count, read_concern: read_concern, session: session, }.update(options) session = spec[:session] op = Operation::ParallelScan.new(spec) # Note that the context object shouldn't be reused for subsequent # GetMore operations. context = Operation::Context.new(client: client, session: session) result = op.execute(server, context: context) result.cursor_ids.map do |cursor_id| spec = { cursor_id: cursor_id, coll_name: collection.name, db_name: database.name, session: session, batch_size: batch_size, to_return: 0, # max_time_ms is not being passed here, I assume intentionally? } op = Operation::GetMore.new(spec) context = Operation::Context.new( client: client, session: session, service_id: result.connection_description.service_id, ) result = op.execute(server, context: context) Cursor.new(self, result, server, session: session) end end def validate_doc!(doc) raise Error::InvalidDocument.new unless doc.respond_to?(:keys) end end end end end