lib/mongo/bulk_write.rb in mongo-2.20.1 vs lib/mongo/bulk_write.rb in mongo-2.21.0

- old
+ new

@@ -58,14 +58,19 @@ def execute operation_id = Monitoring.next_operation_id result_combiner = ResultCombiner.new operations = op_combiner.combine validate_requests! + deadline = calculate_deadline - client.send(:with_session, @options) do |session| - context = Operation::Context.new(client: client, session: session) + client.with_session(@options) do |session| operations.each do |operation| + context = Operation::Context.new( + client: client, + session: session, + operation_timeouts: { operation_timeout_ms: op_timeout_ms(deadline) } + ) if single_statement?(operation) write_concern = write_concern(session) write_with_retry(write_concern, context: context) do |connection, txn_num, context| execute_operation( operation.keys.first, @@ -122,10 +127,13 @@ # @since 2.1.0 def initialize(collection, requests, options = {}) @collection = collection @requests = requests @options = options || {} + if @options[:timeout_ms] && @options[:timeout_ms] < 0 + raise ArgumentError, "timeout_ms options must be non-negative integer" + end end # Is the bulk write ordered? # # @api private @@ -159,9 +167,34 @@ private SINGLE_STATEMENT_OPS = [ :delete_one, :update_one, :insert_one ].freeze + + # @return [ Float | nil ] Deadline for the batch of operations, if set. + def calculate_deadline + timeout_ms = @options[:timeout_ms] || collection.timeout_ms + return nil if timeout_ms.nil? + + if timeout_ms == 0 + 0 + else + Utils.monotonic_time + (timeout_ms / 1_000.0) + end + end + + # @param [ Float | nil ] deadline Deadline for the batch of operations. + # + # @return [ Integer | nil ] Timeout in milliseconds for the next operation. + def op_timeout_ms(deadline) + return nil if deadline.nil? + + if deadline == 0 + 0 + else + ((deadline - Utils.monotonic_time) * 1_000).to_i + end + end def single_statement?(operation) SINGLE_STATEMENT_OPS.include?(operation.keys.first) end