lib/mongo/collection/view/readable.rb in mongo-2.15.1 vs lib/mongo/collection/view/readable.rb in mongo-2.16.0.alpha1
- old
+ new
@@ -22,20 +22,10 @@
# Defines read related behavior for collection view.
#
# @since 2.0.0
module Readable
- # The query modifier constant.
- #
- # @since 2.2.0
- QUERY = '$query'.freeze
-
- # The modifiers option constant.
- #
- # @since 2.2.0
- MODIFIERS = 'modifiers'.freeze
-
# Execute an aggregation on the collection view.
#
# @example Aggregate documents.
# view.aggregate([
# { "$group" => { "_id" => "$city", "tpop" => { "$sum" => "$pop" }}}
@@ -167,17 +157,19 @@
Mongo::Lint.validate_underscore_read_preference(opts[:read])
read_pref = opts[:read] || read_preference
selector = ServerSelector.get(read_pref || server_selector)
with_session(opts) do |session|
read_with_retry(session, selector) do |server|
- apply_collation!(cmd, server, opts)
Operation::Count.new(
- :selector => cmd,
- :db_name => database.name,
- :options => {:limit => -1},
- :read => read_pref,
- :session => session,
+ selector: cmd,
+ db_name: database.name,
+ options: {:limit => -1},
+ read: read_pref,
+ session: session,
+ # For some reason collation was historically accepted as a
+ # string key. Note that this isn't documented as valid usage.
+ collation: opts[:collation] || opts['collation'] || collation,
).execute(server, context: Operation::Context.new(client: client, session: session))
end.n.to_i
end
end
@@ -241,12 +233,12 @@
Mongo::Lint.validate_underscore_read_preference(opts[:read])
read_pref = opts[:read] || read_preference
selector = ServerSelector.get(read_pref || server_selector)
with_session(opts) do |session|
- context = Operation::Context.new(client: client, session: session)
read_with_retry(session, selector) do |server|
+ context = Operation::Context.new(client: client, session: session)
if server.description.server_version_gte?('5.0')
pipeline = [
{'$collStats' => {'count' => {}}},
{'$group' => {'_id' => 1, 'n' => {'$sum' => '$count'}}},
]
@@ -313,18 +305,20 @@
Mongo::Lint.validate_underscore_read_preference(opts[:read])
read_pref = opts[:read] || read_preference
selector = ServerSelector.get(read_pref || server_selector)
with_session(opts) do |session|
read_with_retry(session, selector) do |server|
- apply_collation!(cmd, server, opts)
- Operation::Distinct.new({
- :selector => cmd,
- :db_name => database.name,
- :options => {:limit => -1},
- :read => read_pref,
- :session => session,
- }).execute(server, context: Operation::Context.new(client: client, session: session))
+ Operation::Distinct.new(
+ selector: cmd,
+ db_name: database.name,
+ options: {:limit => -1},
+ read: read_pref,
+ session: session,
+ # For some reason collation was historically accepted as a
+ # string key. Note that this isn't documented as valid usage.
+ collation: opts[:collation] || opts['collation'] || collation,
+ ).execute(server, context: Operation::Context.new(client: client, session: session))
end.first['values']
end
end
# The index that MongoDB will be forced to use for the query.
@@ -540,23 +534,30 @@
# @since 2.0.0
def sort(spec = nil)
configure(:sort, spec)
end
- # “meta” operators that let you modify the output or behavior of a query.
+ # If called without arguments or with a nil argument, returns
+ # the legacy (OP_QUERY) server modifiers for the current view.
+ # If called with a non-nil argument, which must be a Hash or a
+ # subclass, merges the provided modifiers into the current view.
+ # Both string and symbol keys are allowed in the input hash.
#
# @example Set the modifiers document.
# view.modifiers(:$orderby => Mongo::Index::ASCENDING)
#
# @param [ Hash ] doc The modifiers document.
#
# @return [ Hash, View ] Either the modifiers document or a new +View+.
#
# @since 2.1.0
def modifiers(doc = nil)
- return Builder::Modifiers.map_server_modifiers(options) if doc.nil?
- new(options.merge(Builder::Modifiers.map_driver_options(doc)))
+ if doc.nil?
+ Operation::Find::Builder::Modifiers.map_server_modifiers(options)
+ else
+ new(options.merge(Operation::Find::Builder::Modifiers.map_driver_options(BSON::Document.new(doc))))
+ end
end
# A cumulative time limit in milliseconds for processing get more operations
# on a cursor.
#
@@ -643,37 +644,45 @@
end
end
def parallel_scan(cursor_count, options = {})
if options[:session]
+ # The session would be overwritten by the one in +options+ later.
session = client.send(:get_session, @options)
else
session = nil
end
server = server_selector.select_server(cluster, nil, session)
- cmd = Operation::ParallelScan.new({
- :coll_name => collection.name,
- :db_name => database.name,
- :cursor_count => cursor_count,
- :read_concern => read_concern,
- :session => session,
- }.merge!(options))
- cmd.execute(server, context: Operation::Context.new(client: client, session: session)).cursor_ids.map do |cursor_id|
- result = if server.with_connection { |connection| connection.features }.find_command_enabled?
- Operation::GetMore.new({
- :selector => {:getMore => BSON::Int64.new(cursor_id),
- :collection => collection.name},
- :db_name => database.name,
- :session => session,
- }).execute(server, context: Operation::Context.new(client: client, session: session))
- else
- Operation::GetMore.new({
- :to_return => 0,
- :cursor_id => BSON::Int64.new(cursor_id),
- :db_name => database.name,
- :coll_name => collection.name
- }).execute(server, context: Operation::Context.new(client: client, session: session))
- end
+ spec = {
+ coll_name: collection.name,
+ db_name: database.name,
+ cursor_count: cursor_count,
+ read_concern: read_concern,
+ session: session,
+ }.update(options)
+ session = spec[:session]
+ op = Operation::ParallelScan.new(spec)
+ # Note that the context object shouldn't be reused for subsequent
+ # GetMore operations.
+ context = Operation::Context.new(client: client, session: session)
+ result = op.execute(server, context: context)
+ result.cursor_ids.map do |cursor_id|
+ spec = {
+ cursor_id: cursor_id,
+ coll_name: collection.name,
+ db_name: database.name,
+ session: session,
+ batch_size: batch_size,
+ to_return: 0,
+ # max_time_ms is not being passed here, I assume intentionally?
+ }
+ op = Operation::GetMore.new(spec)
+ context = Operation::Context.new(
+ client: client,
+ session: session,
+ service_id: result.connection_description.service_id,
+ )
+ result = op.execute(server, context: context)
Cursor.new(self, result, server, session: session)
end
end
def validate_doc!(doc)