lib/mongo/bulk_write.rb in mongo-2.20.1 vs lib/mongo/bulk_write.rb in mongo-2.21.0
- old
+ new
@@ -58,14 +58,19 @@
def execute
operation_id = Monitoring.next_operation_id
result_combiner = ResultCombiner.new
operations = op_combiner.combine
validate_requests!
+ deadline = calculate_deadline
- client.send(:with_session, @options) do |session|
- context = Operation::Context.new(client: client, session: session)
+ client.with_session(@options) do |session|
operations.each do |operation|
+ context = Operation::Context.new(
+ client: client,
+ session: session,
+ operation_timeouts: { operation_timeout_ms: op_timeout_ms(deadline) }
+ )
if single_statement?(operation)
write_concern = write_concern(session)
write_with_retry(write_concern, context: context) do |connection, txn_num, context|
execute_operation(
operation.keys.first,
@@ -122,10 +127,13 @@
# @since 2.1.0
def initialize(collection, requests, options = {})
@collection = collection
@requests = requests
@options = options || {}
+ if @options[:timeout_ms] && @options[:timeout_ms] < 0
+ raise ArgumentError, "timeout_ms options must be non-negative integer"
+ end
end
# Is the bulk write ordered?
#
# @api private
@@ -159,9 +167,34 @@
private
SINGLE_STATEMENT_OPS = [ :delete_one,
:update_one,
:insert_one ].freeze
+
+ # @return [ Float | nil ] Deadline for the batch of operations, if set.
+ def calculate_deadline
+ timeout_ms = @options[:timeout_ms] || collection.timeout_ms
+ return nil if timeout_ms.nil?
+
+ if timeout_ms == 0
+ 0
+ else
+ Utils.monotonic_time + (timeout_ms / 1_000.0)
+ end
+ end
+
+ # @param [ Float | nil ] deadline Deadline for the batch of operations.
+ #
+ # @return [ Integer | nil ] Timeout in milliseconds for the next operation.
+ def op_timeout_ms(deadline)
+ return nil if deadline.nil?
+
+ if deadline == 0
+ 0
+ else
+ ((deadline - Utils.monotonic_time) * 1_000).to_i
+ end
+ end
def single_statement?(operation)
SINGLE_STATEMENT_OPS.include?(operation.keys.first)
end