lib/mongo/collection/view/readable.rb in mongo-2.15.1 vs lib/mongo/collection/view/readable.rb in mongo-2.16.0.alpha1

- old
+ new

@@ -22,20 +22,10 @@ # Defines read related behavior for collection view. # # @since 2.0.0 module Readable - # The query modifier constant. - # - # @since 2.2.0 - QUERY = '$query'.freeze - - # The modifiers option constant. - # - # @since 2.2.0 - MODIFIERS = 'modifiers'.freeze - # Execute an aggregation on the collection view. # # @example Aggregate documents. # view.aggregate([ # { "$group" => { "_id" => "$city", "tpop" => { "$sum" => "$pop" }}} @@ -167,17 +157,19 @@ Mongo::Lint.validate_underscore_read_preference(opts[:read]) read_pref = opts[:read] || read_preference selector = ServerSelector.get(read_pref || server_selector) with_session(opts) do |session| read_with_retry(session, selector) do |server| - apply_collation!(cmd, server, opts) Operation::Count.new( - :selector => cmd, - :db_name => database.name, - :options => {:limit => -1}, - :read => read_pref, - :session => session, + selector: cmd, + db_name: database.name, + options: {:limit => -1}, + read: read_pref, + session: session, + # For some reason collation was historically accepted as a + # string key. Note that this isn't documented as valid usage. + collation: opts[:collation] || opts['collation'] || collation, ).execute(server, context: Operation::Context.new(client: client, session: session)) end.n.to_i end end @@ -241,12 +233,12 @@ Mongo::Lint.validate_underscore_read_preference(opts[:read]) read_pref = opts[:read] || read_preference selector = ServerSelector.get(read_pref || server_selector) with_session(opts) do |session| - context = Operation::Context.new(client: client, session: session) read_with_retry(session, selector) do |server| + context = Operation::Context.new(client: client, session: session) if server.description.server_version_gte?('5.0') pipeline = [ {'$collStats' => {'count' => {}}}, {'$group' => {'_id' => 1, 'n' => {'$sum' => '$count'}}}, ] @@ -313,18 +305,20 @@ Mongo::Lint.validate_underscore_read_preference(opts[:read]) read_pref = opts[:read] || read_preference selector = ServerSelector.get(read_pref || server_selector) with_session(opts) do |session| read_with_retry(session, selector) do |server| - apply_collation!(cmd, server, opts) - Operation::Distinct.new({ - :selector => cmd, - :db_name => database.name, - :options => {:limit => -1}, - :read => read_pref, - :session => session, - }).execute(server, context: Operation::Context.new(client: client, session: session)) + Operation::Distinct.new( + selector: cmd, + db_name: database.name, + options: {:limit => -1}, + read: read_pref, + session: session, + # For some reason collation was historically accepted as a + # string key. Note that this isn't documented as valid usage. + collation: opts[:collation] || opts['collation'] || collation, + ).execute(server, context: Operation::Context.new(client: client, session: session)) end.first['values'] end end # The index that MongoDB will be forced to use for the query. @@ -540,23 +534,30 @@ # @since 2.0.0 def sort(spec = nil) configure(:sort, spec) end - # “meta” operators that let you modify the output or behavior of a query. + # If called without arguments or with a nil argument, returns + # the legacy (OP_QUERY) server modifiers for the current view. + # If called with a non-nil argument, which must be a Hash or a + # subclass, merges the provided modifiers into the current view. + # Both string and symbol keys are allowed in the input hash. # # @example Set the modifiers document. # view.modifiers(:$orderby => Mongo::Index::ASCENDING) # # @param [ Hash ] doc The modifiers document. # # @return [ Hash, View ] Either the modifiers document or a new +View+. # # @since 2.1.0 def modifiers(doc = nil) - return Builder::Modifiers.map_server_modifiers(options) if doc.nil? - new(options.merge(Builder::Modifiers.map_driver_options(doc))) + if doc.nil? + Operation::Find::Builder::Modifiers.map_server_modifiers(options) + else + new(options.merge(Operation::Find::Builder::Modifiers.map_driver_options(BSON::Document.new(doc)))) + end end # A cumulative time limit in milliseconds for processing get more operations # on a cursor. # @@ -643,37 +644,45 @@ end end def parallel_scan(cursor_count, options = {}) if options[:session] + # The session would be overwritten by the one in +options+ later. session = client.send(:get_session, @options) else session = nil end server = server_selector.select_server(cluster, nil, session) - cmd = Operation::ParallelScan.new({ - :coll_name => collection.name, - :db_name => database.name, - :cursor_count => cursor_count, - :read_concern => read_concern, - :session => session, - }.merge!(options)) - cmd.execute(server, context: Operation::Context.new(client: client, session: session)).cursor_ids.map do |cursor_id| - result = if server.with_connection { |connection| connection.features }.find_command_enabled? - Operation::GetMore.new({ - :selector => {:getMore => BSON::Int64.new(cursor_id), - :collection => collection.name}, - :db_name => database.name, - :session => session, - }).execute(server, context: Operation::Context.new(client: client, session: session)) - else - Operation::GetMore.new({ - :to_return => 0, - :cursor_id => BSON::Int64.new(cursor_id), - :db_name => database.name, - :coll_name => collection.name - }).execute(server, context: Operation::Context.new(client: client, session: session)) - end + spec = { + coll_name: collection.name, + db_name: database.name, + cursor_count: cursor_count, + read_concern: read_concern, + session: session, + }.update(options) + session = spec[:session] + op = Operation::ParallelScan.new(spec) + # Note that the context object shouldn't be reused for subsequent + # GetMore operations. + context = Operation::Context.new(client: client, session: session) + result = op.execute(server, context: context) + result.cursor_ids.map do |cursor_id| + spec = { + cursor_id: cursor_id, + coll_name: collection.name, + db_name: database.name, + session: session, + batch_size: batch_size, + to_return: 0, + # max_time_ms is not being passed here, I assume intentionally? + } + op = Operation::GetMore.new(spec) + context = Operation::Context.new( + client: client, + session: session, + service_id: result.connection_description.service_id, + ) + result = op.execute(server, context: context) Cursor.new(self, result, server, session: session) end end def validate_doc!(doc)