lib/mongo/bulk_write.rb in mongo-2.5.0.beta vs lib/mongo/bulk_write.rb in mongo-2.5.0

- old
+ new

@@ -51,26 +51,35 @@ # # @since 2.1.0 def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new + operations = op_combiner.combine client.send(:with_session, @options) do |session| - write_with_retry(session, Proc.new { next_primary }) do |server| - operations = op_combiner.combine - raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled? - raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled? - - operations.each do |operation| - execute_operation( - operation.keys.first, - operation.values.first, - server, - operation_id, - result_combiner, - session - ) + operations.each do |operation| + if single_statement?(operation) + write_with_retry(session, write_concern) do |server, txn_num| + execute_operation( + operation.keys.first, + operation.values.first, + server, + operation_id, + result_combiner, + session, + txn_num) + end + else + legacy_write_with_retry do |server| + execute_operation( + operation.keys.first, + operation.values.first, + server, + operation_id, + result_combiner, + session) + end end end end result_combiner.result end @@ -135,10 +144,18 @@ WriteConcern.get(options[:write_concern]) : collection.write_concern end private + SINGLE_STATEMENT_OPS = [ :delete_one, + :update_one, + :insert_one ].freeze + + def single_statement?(operation) + SINGLE_STATEMENT_OPS.include?(operation.keys.first) + end + def base_spec(operation_id, session) { :db_name => database.name, :coll_name => collection.name, :write_concern => write_concern, @@ -149,53 +166,64 @@ :id_generator => client.options[:id_generator], :session => session } end - def execute_operation(name, values, server, operation_id, combiner, session) + def execute_operation(name, values, server, operation_id, combiner, session, txn_num = nil) + raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled? + raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled? begin if values.size > server.max_write_batch_size - split_execute(name, values, server, operation_id, combiner, session) + split_execute(name, values, server, operation_id, combiner, session, txn_num) else - combiner.combine!(send(name, values, server, operation_id, session), values.size) + combiner.combine!(send(name, values, server, operation_id, session, txn_num), values.size) end rescue Error::MaxBSONSize, Error::MaxMessageSize => e raise e if values.size <= 1 - split_execute(name, values, server, operation_id, combiner, session) + split_execute(name, values, server, operation_id, combiner, session, txn_num) end end def op_combiner @op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests) end - def split_execute(name, values, server, operation_id, combiner, session) - execute_operation(name, values.shift(values.size / 2), server, operation_id, combiner, session) - execute_operation(name, values, server, operation_id, combiner, session) + def split_execute(name, values, server, operation_id, combiner, session, txn_num) + execute_operation(name, values.shift(values.size / 2), server, operation_id, combiner, session, txn_num) + + txn_num = session.next_txn_num if txn_num + execute_operation(name, values, server, operation_id, combiner, session, txn_num) end - def delete(documents, server, operation_id, session) + def delete_one(documents, server, operation_id, session, txn_num) Operation::Write::Bulk::Delete.new( - base_spec(operation_id, session).merge(:deletes => documents) + base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num) ).execute(server) end - alias :delete_one :delete - alias :delete_many :delete + def delete_many(documents, server, operation_id, session, txn_num) + Operation::Write::Bulk::Delete.new( + base_spec(operation_id, session).merge(:deletes => documents) + ).execute(server) + end - def insert_one(documents, server, operation_id, session) + + def insert_one(documents, server, operation_id, session, txn_num) Operation::Write::Bulk::Insert.new( - base_spec(operation_id, session).merge(:documents => documents) + base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num) ).execute(server) end - def update(documents, server, operation_id, session) + def update_one(documents, server, operation_id, session, txn_num) Operation::Write::Bulk::Update.new( - base_spec(operation_id, session).merge(:updates => documents) + base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num) ).execute(server) end + alias :replace_one :update_one - alias :replace_one :update - alias :update_one :update - alias :update_many :update + def update_many(documents, server, operation_id, session, txn_num) + Operation::Write::Bulk::Update.new( + base_spec(operation_id, session).merge(:updates => documents) + ).execute(server) + end end end