lib/mongo/database/view.rb in mongo-2.20.1 vs lib/mongo/database/view.rb in mongo-2.21.0

- old
+ new

@@ -13,20 +13,24 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/cursor/nontailable' + module Mongo class Database # A class representing a view of a database. # # @since 2.0.0 class View extend Forwardable include Enumerable include Retryable + include Mongo::CursorHost + include Cursor::NonTailable def_delegators :@database, :cluster, :read_preference, :client # @api private def_delegators :@database, :server_selector, :read_concern, :write_concern def_delegators :cluster, :next_primary @@ -54,23 +58,32 @@ # @option options [ true, false ] :authorized_collections A flag, when # set to true, that allows a user without the required privilege # to run the command when access control is enforced. # @option options [ Object ] :comment A user-provided # comment to attach to this command. + # @option options [ Integer ] :timeout_ms The operation timeout in milliseconds. + # Must be a non-negative integer. An explicit value of 0 means infinite. + # The default value is unset which means the value is inherited from + # the database or the client. # # See https://mongodb.com/docs/manual/reference/command/listCollections/ # for more information and usage. # @option options [ Session ] :session The session to use. # # @return [ Array<String> ] The names of all non-system collections. # # @since 2.0.0 def collection_names(options = {}) @batch_size = options[:batch_size] - session = client.send(:get_session, options) - cursor = read_with_retry_cursor(session, ServerSelector.primary, self) do |server| - send_initial_query(server, session, options.merge(name_only: true)) + session = client.get_session(options) + context = Operation::Context.new( + client: client, + session: session, + operation_timeouts: operation_timeouts(options) + ) + cursor = read_with_retry_cursor(session, ServerSelector.primary, self, context: context) do |server| + send_initial_query(server, session, context, options.merge(name_only: true)) end cursor.map do |info| if cursor.initial_result.connection_description.features.list_collections_enabled? info['name'] else @@ -110,32 +123,51 @@ # # @return [ Array<Hash> ] Info for each collection in the database. # # @since 2.0.5 def list_collections(options = {}) - session = client.send(:get_session, options) + session = client.get_session(options) collections_info(session, ServerSelector.primary, options) end # Create the new database view. # # @example Create the new database view. - # View::Index.new(database) + # Database::View.new(database) # # @param [ Database ] database The database. + # @param [ Hash ] options The options to configure the view with. # + # @option options [ :cursor_lifetime | :iteration ] :timeout_mode How to interpret + # :timeout_ms (whether it applies to the lifetime of the cursor, or per + # iteration). + # @option options [ Integer ] :timeout_ms The operation timeout in milliseconds. + # Must be a non-negative integer. An explicit value of 0 means infinite. + # The default value is unset which means the value is inherited from + # the database or the client. + # # @since 2.0.0 - def initialize(database) + def initialize(database, options = {}) @database = database + @operation_timeout_ms = options.delete(:timeout_ms) + + validate_timeout_mode!(options) + @batch_size = nil @limit = nil @collection = @database[Database::COMMAND] end # @api private attr_reader :database + # @return [ Integer | nil | The timeout_ms value that was passed as an + # option to the view. + # + # @api private + attr_reader :operation_timeout_ms + # Execute an aggregation on the database view. # # @example Aggregate documents. # view.aggregate([ # { "$listLocalSessions" => {} } @@ -150,19 +182,45 @@ # @api private def aggregate(pipeline, options = {}) Collection::View::Aggregation.new(self, pipeline, options) end + # The timeout_ms value to use for this operation; either specified as an + # option to the view, or inherited from the database. + # + # @return [ Integer | nil ] the timeout_ms for this operation + def timeout_ms + operation_timeout_ms || database.timeout_ms + end + + # @return [ Hash ] timeout_ms value set on the operation level (if any). + # + # @api private + def operation_timeouts(opts = {}) + {}.tap do |result| + if opts[:timeout_ms] || operation_timeout_ms + result[:operation_timeout_ms] = opts.delete(:timeout_ms) || operation_timeout_ms + else + result[:inherited_timeout_ms] = database.timeout_ms + end + end + end + private def collections_info(session, server_selector, options = {}, &block) description = nil - cursor = read_with_retry_cursor(session, server_selector, self) do |server| + context = Operation::Context.new( + client: client, + session: session, + operation_timeouts: operation_timeouts(options) + ) + cursor = read_with_retry_cursor(session, server_selector, self, context: context) do |server| # TODO take description from the connection used to send the query # once https://jira.mongodb.org/browse/RUBY-1601 is fixed. description = server.description - send_initial_query(server, session, options) + send_initial_query(server, session, context, options) end # On 3.0+ servers, we get just the collection names. # On 2.6 server, we get collection names prefixed with the database # name. We need to filter system collections out here because # in the caller we don't know which server version executed the @@ -222,20 +280,29 @@ # @option options [ true | false | nil ] :deserialize_as_bson Whether the # query results should be deserialized to BSON types, or to Ruby # types (where possible). # # @return [ Operation::Result ] Result of the query. - def send_initial_query(server, session, options = {}) + def send_initial_query(server, session, context, options = {}) opts = options.dup execution_opts = {} if opts.key?(:deserialize_as_bson) execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson) end - initial_query_op(session, opts).execute( - server, - context: Operation::Context.new(client: client, session: session), - options: execution_opts - ) + if server.load_balancer? + connection = server.pool.check_out(context: context) + initial_query_op(session, opts).execute_with_connection( + connection, + context: context, + options: execution_opts + ) + else + initial_query_op(session, opts).execute( + server, + context: context, + options: execution_opts + ) + end end end end end