lib/mongo/database/view.rb in mongo-2.20.1 vs lib/mongo/database/view.rb in mongo-2.21.0
- old
+ new
@@ -13,20 +13,24 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'mongo/cursor/nontailable'
+
module Mongo
class Database
# A class representing a view of a database.
#
# @since 2.0.0
class View
extend Forwardable
include Enumerable
include Retryable
+ include Mongo::CursorHost
+ include Cursor::NonTailable
def_delegators :@database, :cluster, :read_preference, :client
# @api private
def_delegators :@database, :server_selector, :read_concern, :write_concern
def_delegators :cluster, :next_primary
@@ -54,23 +58,32 @@
# @option options [ true, false ] :authorized_collections A flag, when
# set to true, that allows a user without the required privilege
# to run the command when access control is enforced.
# @option options [ Object ] :comment A user-provided
# comment to attach to this command.
+ # @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
+ # Must be a non-negative integer. An explicit value of 0 means infinite.
+ # The default value is unset which means the value is inherited from
+ # the database or the client.
#
# See https://mongodb.com/docs/manual/reference/command/listCollections/
# for more information and usage.
# @option options [ Session ] :session The session to use.
#
# @return [ Array<String> ] The names of all non-system collections.
#
# @since 2.0.0
def collection_names(options = {})
@batch_size = options[:batch_size]
- session = client.send(:get_session, options)
- cursor = read_with_retry_cursor(session, ServerSelector.primary, self) do |server|
- send_initial_query(server, session, options.merge(name_only: true))
+ session = client.get_session(options)
+ context = Operation::Context.new(
+ client: client,
+ session: session,
+ operation_timeouts: operation_timeouts(options)
+ )
+ cursor = read_with_retry_cursor(session, ServerSelector.primary, self, context: context) do |server|
+ send_initial_query(server, session, context, options.merge(name_only: true))
end
cursor.map do |info|
if cursor.initial_result.connection_description.features.list_collections_enabled?
info['name']
else
@@ -110,32 +123,51 @@
#
# @return [ Array<Hash> ] Info for each collection in the database.
#
# @since 2.0.5
def list_collections(options = {})
- session = client.send(:get_session, options)
+ session = client.get_session(options)
collections_info(session, ServerSelector.primary, options)
end
# Create the new database view.
#
# @example Create the new database view.
- # View::Index.new(database)
+ # Database::View.new(database)
#
# @param [ Database ] database The database.
+ # @param [ Hash ] options The options to configure the view with.
#
+ # @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret
+ # :timeout_ms (whether it applies to the lifetime of the cursor, or per
+ # iteration).
+ # @option options [ Integer ] :timeout_ms The operation timeout in milliseconds.
+ # Must be a non-negative integer. An explicit value of 0 means infinite.
+ # The default value is unset which means the value is inherited from
+ # the database or the client.
+ #
# @since 2.0.0
- def initialize(database)
+ def initialize(database, options = {})
@database = database
+ @operation_timeout_ms = options.delete(:timeout_ms)
+
+ validate_timeout_mode!(options)
+
@batch_size = nil
@limit = nil
@collection = @database[Database::COMMAND]
end
# @api private
attr_reader :database
+ # @return [ Integer | nil | The timeout_ms value that was passed as an
+ # option to the view.
+ #
+ # @api private
+ attr_reader :operation_timeout_ms
+
# Execute an aggregation on the database view.
#
# @example Aggregate documents.
# view.aggregate([
# { "$listLocalSessions" => {} }
@@ -150,19 +182,45 @@
# @api private
def aggregate(pipeline, options = {})
Collection::View::Aggregation.new(self, pipeline, options)
end
+ # The timeout_ms value to use for this operation; either specified as an
+ # option to the view, or inherited from the database.
+ #
+ # @return [ Integer | nil ] the timeout_ms for this operation
+ def timeout_ms
+ operation_timeout_ms || database.timeout_ms
+ end
+
+ # @return [ Hash ] timeout_ms value set on the operation level (if any).
+ #
+ # @api private
+ def operation_timeouts(opts = {})
+ {}.tap do |result|
+ if opts[:timeout_ms] || operation_timeout_ms
+ result[:operation_timeout_ms] = opts.delete(:timeout_ms) || operation_timeout_ms
+ else
+ result[:inherited_timeout_ms] = database.timeout_ms
+ end
+ end
+ end
+
private
def collections_info(session, server_selector, options = {}, &block)
description = nil
- cursor = read_with_retry_cursor(session, server_selector, self) do |server|
+ context = Operation::Context.new(
+ client: client,
+ session: session,
+ operation_timeouts: operation_timeouts(options)
+ )
+ cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server|
# TODO take description from the connection used to send the query
# once https://jira.mongodb.org/browse/RUBY-1601 is fixed.
description = server.description
- send_initial_query(server, session, options)
+ send_initial_query(server, session, context, options)
end
# On 3.0+ servers, we get just the collection names.
# On 2.6 server, we get collection names prefixed with the database
# name. We need to filter system collections out here because
# in the caller we don't know which server version executed the
@@ -222,20 +280,29 @@
# @option options [ true | false | nil ] :deserialize_as_bson Whether the
# query results should be deserialized to BSON types, or to Ruby
# types (where possible).
#
# @return [ Operation::Result ] Result of the query.
- def send_initial_query(server, session, options = {})
+ def send_initial_query(server, session, context, options = {})
opts = options.dup
execution_opts = {}
if opts.key?(:deserialize_as_bson)
execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson)
end
- initial_query_op(session, opts).execute(
- server,
- context: Operation::Context.new(client: client, session: session),
- options: execution_opts
- )
+ if server.load_balancer?
+ connection = server.pool.check_out(context: context)
+ initial_query_op(session, opts).execute_with_connection(
+ connection,
+ context: context,
+ options: execution_opts
+ )
+ else
+ initial_query_op(session, opts).execute(
+ server,
+ context: context,
+ options: execution_opts
+ )
+ end
end
end
end
end