lib/mongo/retryable/read_worker.rb in mongo-2.20.1 vs lib/mongo/retryable/read_worker.rb in mongo-2.21.0
- old
+ new
@@ -58,23 +58,25 @@
# @param [ Mongo::Session ] session The session that the operation is being
# run on.
# @param [ Mongo::ServerSelector::Selectable ] server_selector Server
# selector for the operation.
# @param [ CollectionView ] view The +CollectionView+ defining the query.
+ # @param [ Operation::Context | nil ] context the operation context to use
+ # with the cursor.
# @param [ Proc ] block The block to execute.
#
# @return [ Cursor ] The cursor for the result set.
- def read_with_retry_cursor(session, server_selector, view, &block)
- read_with_retry(session, server_selector) do |server|
+ def read_with_retry_cursor(session, server_selector, view, context: nil, &block)
+ read_with_retry(session, server_selector, context) do |server|
result = yield server
# RUBY-2367: This will be updated to allow the query cache to
# cache cursors with multi-batch results.
if QueryCache.enabled? && !view.collection.system_collection?
- CachingCursor.new(view, result, server, session: session)
+ CachingCursor.new(view, result, server, session: session, context: context)
else
- Cursor.new(view, result, server, session: session)
+ Cursor.new(view, result, server, session: session, context: context)
end
end
end
# Execute a read operation with retrying.
@@ -105,20 +107,22 @@
#
# @param [ Mongo::Session | nil ] session The session that the operation
# is being run on.
# @param [ Mongo::ServerSelector::Selectable | nil ] server_selector
# Server selector for the operation.
+ # @param [ Mongo::Operation::Context | nil ] context Context for the
+ # read operation.
# @param [ Proc ] block The block to execute.
#
# @return [ Result ] The result of the operation.
- def read_with_retry(session = nil, server_selector = nil, &block)
+ def read_with_retry(session = nil, server_selector = nil, context = nil, &block)
if session.nil? && server_selector.nil?
deprecated_legacy_read_with_retry(&block)
elsif session&.retry_reads?
- modern_read_with_retry(session, server_selector, &block)
+ modern_read_with_retry(session, server_selector, context, &block)
elsif client.max_read_retries > 0
- legacy_read_with_retry(session, server_selector, &block)
+ legacy_read_with_retry(session, server_selector, context, &block)
else
read_without_retry(session, server_selector, &block)
end
end
@@ -184,42 +188,51 @@
#
# @param [ Mongo::Session ] session The session that the operation is
# being run on.
# @param [ Mongo::ServerSelector::Selectable ] server_selector Server
# selector for the operation.
+ # @param [ Mongo::Operation::Context ] context Context for the
+ # read operation.
# @param [ Proc ] block The block to execute.
#
# @return [ Result ] The result of the operation.
- def modern_read_with_retry(session, server_selector, &block)
- server = select_server(cluster, server_selector, session)
+ def modern_read_with_retry(session, server_selector, context, &block)
+ server = select_server(
+ cluster,
+ server_selector,
+ session,
+ timeout: context&.remaining_timeout_sec
+ )
yield server
- rescue *retryable_exceptions, Error::OperationFailure, Auth::Unauthorized, Error::PoolError => e
+ rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e
e.add_notes('modern retry', 'attempt 1')
raise e if session.in_transaction?
raise e if !is_retryable_exception?(e) && !e.write_retryable?
- retry_read(e, session, server_selector, failed_server: server, &block)
+ retry_read(e, session, server_selector, context: context, failed_server: server, &block)
end
# Attempts to do a "legacy" read with retry. The operation will be
# attempted multiple times, up to the client's `max_read_retries`
# setting.
#
# @param [ Mongo::Session ] session The session that the operation is
# being run on.
# @param [ Mongo::ServerSelector::Selectable ] server_selector Server
# selector for the operation.
+ # @param [ Mongo::Operation::Context | nil ] context Context for the
+ # read operation.
# @param [ Proc ] block The block to execute.
#
# @return [ Result ] The result of the operation.
- def legacy_read_with_retry(session, server_selector, &block)
+ def legacy_read_with_retry(session, server_selector, context = nil, &block)
+ context&.check_timeout!
attempt = attempt ? attempt + 1 : 1
yield select_server(cluster, server_selector, session)
- rescue *legacy_retryable_exceptions, Error::OperationFailure => e
+ rescue *legacy_retryable_exceptions, Error::OperationFailure::Family => e
e.add_notes('legacy retry', "attempt #{attempt}")
if is_legacy_retryable_exception?(e)
-
raise e if attempt > client.max_read_retries || session&.in_transaction?
elsif e.retryable? && !session&.in_transaction?
raise e if attempt > client.max_read_retries
else
raise e
@@ -243,11 +256,11 @@
def read_without_retry(session, server_selector, &block)
server = select_server(cluster, server_selector, session)
begin
yield server
- rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure => e
+ rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure::Family => e
e.add_note('retries disabled')
raise e
end
end
@@ -257,42 +270,69 @@
# the retry.
# @param [ Mongo::Session ] session The session that the operation is
# being run on.
# @param [ Mongo::ServerSelector::Selectable ] server_selector Server
# selector for the operation.
- # @param [ Mongo::Server ] failed_server The server on which the original
+ # @param [ Mongo::Operation::Context | nil ] :context Context for the
+ # read operation.
+ # @param [ Mongo::Server | nil ] :failed_server The server on which the original
# operation failed.
# @param [ Proc ] block The block to execute.
#
# @return [ Result ] The result of the operation.
- def retry_read(original_error, session, server_selector, failed_server: nil, &block)
- begin
- server = select_server(cluster, server_selector, session, failed_server)
- rescue Error, Error::AuthError => e
- original_error.add_note("later retry failed: #{e.class}: #{e}")
- raise original_error
- end
+ def retry_read(original_error, session, server_selector, context: nil, failed_server: nil, &block)
+ server = select_server_for_retry(
+ original_error, session, server_selector, context, failed_server
+ )
log_retry(original_error, message: 'Read retry')
begin
+ context&.check_timeout!
+ attempt = attempt ? attempt + 1 : 2
yield server, true
+ rescue Error::TimeoutError
+ raise
rescue *retryable_exceptions => e
- e.add_notes('modern retry', 'attempt 2')
- raise e
- rescue Error::OperationFailure, Error::PoolError => e
+ e.add_notes('modern retry', "attempt #{attempt}")
+ if context&.csot?
+ failed_server = server
+ retry
+ else
+ raise e
+ end
+ rescue Error::OperationFailure::Family, Error::PoolError => e
e.add_note('modern retry')
- unless e.write_retryable?
+ if e.write_retryable?
+ e.add_note("attempt #{attempt}")
+ if context&.csot?
+ failed_server = server
+ retry
+ else
+ raise e
+ end
+ else
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
end
- e.add_note("attempt 2")
- raise e
rescue Error, Error::AuthError => e
e.add_note('modern retry')
original_error.add_note("later retry failed: #{e.class}: #{e}")
raise original_error
end
+ end
+
+ def select_server_for_retry(original_error, session, server_selector, context, failed_server)
+ select_server(
+ cluster,
+ server_selector,
+ session,
+ failed_server,
+ timeout: context&.remaining_timeout_sec
+ )
+ rescue Error, Error::AuthError => e
+ original_error.add_note("later retry failed: #{e.class}: #{e}")
+ raise original_error
end
end
end
end