# frozen_string_literal: true # encoding: utf-8 # Copyright (C) 2014-2020 MongoDB Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'mongo/bulk_write/result' require 'mongo/bulk_write/transformable' require 'mongo/bulk_write/validatable' require 'mongo/bulk_write/combineable' require 'mongo/bulk_write/ordered_combiner' require 'mongo/bulk_write/unordered_combiner' require 'mongo/bulk_write/result_combiner' module Mongo class BulkWrite extend Forwardable include Operation::ResponseHandling # @return [ Mongo::Collection ] collection The collection. attr_reader :collection # @return [ Array ] requests The requests. attr_reader :requests # @return [ Hash, BSON::Document ] options The options. attr_reader :options # Delegate various methods to the collection. def_delegators :@collection, :database, :cluster, :write_with_retry, :nro_write_with_retry, :next_primary def_delegators :database, :client # Execute the bulk write operation. # # @example Execute the bulk write. # bulk_write.execute # # @return [ Mongo::BulkWrite::Result ] The result. # # @since 2.1.0 def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new operations = op_combiner.combine validate_requests! client.send(:with_session, @options) do |session| context = Operation::Context.new(client: client, session: session) operations.each do |operation| if single_statement?(operation) write_concern = write_concern(session) write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session, txn_num) end else nro_write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, operation.values.flatten, connection, context, operation_id, result_combiner, session) end end end end result_combiner.result end # Create the new bulk write operation. # # @api private # # @example Create an ordered bulk write. # Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}]) # # @example Create an unordered bulk write. # Mongo::BulkWrite.new(collection, [{ insert_one: { _id: 1 }}], ordered: false) # # @example Create an ordered mixed bulk write. # Mongo::BulkWrite.new( # collection, # [ # { insert_one: { _id: 1 }}, # { update_one: { filter: { _id: 0 }, update: { '$set' => { name: 'test' }}}}, # { delete_one: { filter: { _id: 2 }}} # ] # ) # # @param [ Mongo::Collection ] collection The collection. # @param [ Array ] requests The requests, cannot be empty. # @param [ Hash, BSON::Document ] options The options. # # @since 2.1.0 def initialize(collection, requests, options = {}) @collection = collection @requests = requests @options = options || {} end # Is the bulk write ordered? # # @api private # # @example Is the bulk write ordered? # bulk_write.ordered? # # @return [ true, false ] If the bulk write is ordered. # # @since 2.1.0 def ordered? @ordered ||= options.fetch(:ordered, true) end # Get the write concern for the bulk write. # # @api private # # @example Get the write concern. # bulk_write.write_concern # # @return [ WriteConcern ] The write concern. # # @since 2.1.0 def write_concern(session = nil) @write_concern ||= options[:write_concern] ? WriteConcern.get(options[:write_concern]) : collection.write_concern_with_session(session) end private SINGLE_STATEMENT_OPS = [ :delete_one, :update_one, :insert_one ].freeze def single_statement?(operation) SINGLE_STATEMENT_OPS.include?(operation.keys.first) end def base_spec(operation_id, session) { :db_name => database.name, :coll_name => collection.name, :write_concern => write_concern(session), :ordered => ordered?, :operation_id => operation_id, :bypass_document_validation => !!options[:bypass_document_validation], :max_time_ms => options[:max_time_ms], :options => options, :id_generator => client.options[:id_generator], :session => session, :comment => options[:comment], :let => options[:let], } end def execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num = nil) validate_collation!(connection) validate_array_filters!(connection) validate_hint!(connection) unpin_maybe(session, connection) do if values.size > connection.description.max_write_batch_size split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) else result = send(name, values, connection, context, operation_id, session, txn_num) add_server_diagnostics(connection) do add_error_labels(connection, context) do result_combiner.combine!(result, values.size) end end end end # With OP_MSG (3.6+ servers), the size of each section in the message # is independently capped at 16m and each bulk operation becomes # its own section. The size of the entire bulk write is limited to 48m. # With OP_QUERY (pre-3.6 servers), the entire bulk write is sent as a # single document and is thus subject to the 16m document size limit. # This means the splits differ between pre-3.6 and 3.6+ servers, with # 3.6+ servers being able to split less. rescue Error::MaxBSONSize, Error::MaxMessageSize => e raise e if values.size <= 1 unpin_maybe(session, connection) do split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) end end def op_combiner @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests) end def split_execute(name, values, connection, context, operation_id, result_combiner, session, txn_num) execute_operation(name, values.shift(values.size / 2), connection, context, operation_id, result_combiner, session, txn_num) txn_num = session.next_txn_num if txn_num execute_operation(name, values, connection, context, operation_id, result_combiner, session, txn_num) end def delete_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num) Operation::Delete.new(spec).bulk_execute(connection, context: context) end def delete_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:deletes => documents) Operation::Delete.new(spec).bulk_execute(connection, context: context) end def insert_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num) Operation::Insert.new(spec).bulk_execute(connection, context: context) end def update_one(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num) Operation::Update.new(spec).bulk_execute(connection, context: context) end alias :replace_one :update_one def update_many(documents, connection, context, operation_id, session, txn_num) QueryCache.clear_namespace(collection.namespace) spec = base_spec(operation_id, session).merge(:updates => documents) Operation::Update.new(spec).bulk_execute(connection, context: context) end private def validate_collation!(connection) if op_combiner.has_collation? && !connection.features.collation_enabled? raise Error::UnsupportedCollation.new end end def validate_array_filters!(connection) if op_combiner.has_array_filters? && !connection.features.array_filters_enabled? raise Error::UnsupportedArrayFilters.new end end def validate_hint!(connection) if op_combiner.has_hint? if !can_hint?(connection) && write_concern && !write_concern.acknowledged? raise Error::UnsupportedOption.hint_error(unacknowledged_write: true) elsif !connection.features.update_delete_option_validation_enabled? raise Error::UnsupportedOption.hint_error end end end # Loop through the requests and check if each operation is allowed to send # a hint for each operation on the given server version. # # For the following operations, the client can send a hint for servers >= 4.2 # and for the rest, the client can only send it for 4.4+: # - updateOne # - updateMany # - replaceOne # # @param [ Connection ] connection The connection object. # # @return [ true | false ] Whether the request is able to send hints for # the current server version. def can_hint?(connection) gte_4_2 = connection.server.description.server_version_gte?('4.2') gte_4_4 = connection.server.description.server_version_gte?('4.4') op_combiner.requests.all? do |req| op = req.keys.first if req[op].keys.include?(:hint) if [:update_one, :update_many, :replace_one].include?(op) gte_4_2 else gte_4_4 end else true end end end # Perform the request document validation required by driver specifications. # This method validates the first key of each update request document to be # an operator (i.e. start with $) and the first key of each replacement # document to not be an operator (i.e. not start with $). The request document # may be invalid without this method flagging it as such (for example an # update or replacement document containing some keys which are operators # and some which are not), in which case the driver expects the server to # fail the operation with an error. # # Raise an ArgumentError if requests is empty. # # @raise [ Error::InvalidUpdateDocument, Error::InvalidReplacementDocument, # ArgumentError ] # if the document is invalid. def validate_requests! if @requests.empty? raise ArgumentError, "Bulk write requests cannot be empty" end @requests.each do |req| if op = req.keys.first if [:update_one, :update_many].include?(op) if doc = maybe_first(req.dig(op, :update)) if key = doc.keys&.first unless key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidUpdateDocument.new(key: key) else Error::InvalidUpdateDocument.warn(Logger.logger, key) end end end end elsif op == :replace_one if key = req.dig(op, :replacement)&.keys&.first if key.to_s.start_with?("$") if Mongo.validate_update_replace raise Error::InvalidReplacementDocument.new(key: key) else Error::InvalidReplacementDocument.warn(Logger.logger, key) end end end end end end end # If the given object is an array return the first element, otherwise # return the given object. # # @param [ Object ] obj The given object. # # @return [ Object ] The first element of the array or the given object. def maybe_first(obj) obj.is_a?(Array) ? obj.first : obj end end end