lib/mongo/bulk_write.rb in mongo-2.5.0.beta vs lib/mongo/bulk_write.rb in mongo-2.5.0
- old
+ new
@@ -51,26 +51,35 @@
#
# @since 2.1.0
def execute
operation_id = Monitoring.next_operation_id
result_combiner = ResultCombiner.new
+ operations = op_combiner.combine
client.send(:with_session, @options) do |session|
- write_with_retry(session, Proc.new { next_primary }) do |server|
- operations = op_combiner.combine
- raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled?
- raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled?
-
- operations.each do |operation|
- execute_operation(
- operation.keys.first,
- operation.values.first,
- server,
- operation_id,
- result_combiner,
- session
- )
+ operations.each do |operation|
+ if single_statement?(operation)
+ write_with_retry(session, write_concern) do |server, txn_num|
+ execute_operation(
+ operation.keys.first,
+ operation.values.first,
+ server,
+ operation_id,
+ result_combiner,
+ session,
+ txn_num)
+ end
+ else
+ legacy_write_with_retry do |server|
+ execute_operation(
+ operation.keys.first,
+ operation.values.first,
+ server,
+ operation_id,
+ result_combiner,
+ session)
+ end
end
end
end
result_combiner.result
end
@@ -135,10 +144,18 @@
WriteConcern.get(options[:write_concern]) : collection.write_concern
end
private
+ SINGLE_STATEMENT_OPS = [ :delete_one,
+ :update_one,
+ :insert_one ].freeze
+
+ def single_statement?(operation)
+ SINGLE_STATEMENT_OPS.include?(operation.keys.first)
+ end
+
def base_spec(operation_id, session)
{
:db_name => database.name,
:coll_name => collection.name,
:write_concern => write_concern,
@@ -149,53 +166,64 @@
:id_generator => client.options[:id_generator],
:session => session
}
end
- def execute_operation(name, values, server, operation_id, combiner, session)
+ def execute_operation(name, values, server, operation_id, combiner, session, txn_num = nil)
+ raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled?
+ raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled?
begin
if values.size > server.max_write_batch_size
- split_execute(name, values, server, operation_id, combiner, session)
+ split_execute(name, values, server, operation_id, combiner, session, txn_num)
else
- combiner.combine!(send(name, values, server, operation_id, session), values.size)
+ combiner.combine!(send(name, values, server, operation_id, session, txn_num), values.size)
end
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
raise e if values.size <= 1
- split_execute(name, values, server, operation_id, combiner, session)
+ split_execute(name, values, server, operation_id, combiner, session, txn_num)
end
end
def op_combiner
@op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end
- def split_execute(name, values, server, operation_id, combiner, session)
- execute_operation(name, values.shift(values.size / 2), server, operation_id, combiner, session)
- execute_operation(name, values, server, operation_id, combiner, session)
+ def split_execute(name, values, server, operation_id, combiner, session, txn_num)
+ execute_operation(name, values.shift(values.size / 2), server, operation_id, combiner, session, txn_num)
+
+ txn_num = session.next_txn_num if txn_num
+ execute_operation(name, values, server, operation_id, combiner, session, txn_num)
end
- def delete(documents, server, operation_id, session)
+ def delete_one(documents, server, operation_id, session, txn_num)
Operation::Write::Bulk::Delete.new(
- base_spec(operation_id, session).merge(:deletes => documents)
+ base_spec(operation_id, session).merge(:deletes => documents, :txn_num => txn_num)
).execute(server)
end
- alias :delete_one :delete
- alias :delete_many :delete
+ def delete_many(documents, server, operation_id, session, txn_num)
+ Operation::Write::Bulk::Delete.new(
+ base_spec(operation_id, session).merge(:deletes => documents)
+ ).execute(server)
+ end
- def insert_one(documents, server, operation_id, session)
+
+ def insert_one(documents, server, operation_id, session, txn_num)
Operation::Write::Bulk::Insert.new(
- base_spec(operation_id, session).merge(:documents => documents)
+ base_spec(operation_id, session).merge(:documents => documents, :txn_num => txn_num)
).execute(server)
end
- def update(documents, server, operation_id, session)
+ def update_one(documents, server, operation_id, session, txn_num)
Operation::Write::Bulk::Update.new(
- base_spec(operation_id, session).merge(:updates => documents)
+ base_spec(operation_id, session).merge(:updates => documents, :txn_num => txn_num)
).execute(server)
end
+ alias :replace_one :update_one
- alias :replace_one :update
- alias :update_one :update
- alias :update_many :update
+ def update_many(documents, server, operation_id, session, txn_num)
+ Operation::Write::Bulk::Update.new(
+ base_spec(operation_id, session).merge(:updates => documents)
+ ).execute(server)
+ end
end
end