lib/mongo/retryable/read_worker.rb in mongo-2.20.1 vs lib/mongo/retryable/read_worker.rb in mongo-2.21.0

- old
+ new

@@ -58,23 +58,25 @@ # @param [ Mongo::Session ] session The session that the operation is being # run on. # @param [ Mongo::ServerSelector::Selectable ] server_selector Server # selector for the operation. # @param [ CollectionView ] view The +CollectionView+ defining the query. + # @param [ Operation::Context | nil ] context the operation context to use + # with the cursor. # @param [ Proc ] block The block to execute. # # @return [ Cursor ] The cursor for the result set. - def read_with_retry_cursor(session, server_selector, view, &block) - read_with_retry(session, server_selector) do |server| + def read_with_retry_cursor(session, server_selector, view, context: nil, &block) + read_with_retry(session, server_selector, context) do |server| result = yield server # RUBY-2367: This will be updated to allow the query cache to # cache cursors with multi-batch results. if QueryCache.enabled? && !view.collection.system_collection? - CachingCursor.new(view, result, server, session: session) + CachingCursor.new(view, result, server, session: session, context: context) else - Cursor.new(view, result, server, session: session) + Cursor.new(view, result, server, session: session, context: context) end end end # Execute a read operation with retrying. @@ -105,20 +107,22 @@ # # @param [ Mongo::Session | nil ] session The session that the operation # is being run on. # @param [ Mongo::ServerSelector::Selectable | nil ] server_selector # Server selector for the operation. + # @param [ Mongo::Operation::Context | nil ] context Context for the + # read operation. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def read_with_retry(session = nil, server_selector = nil, &block) + def read_with_retry(session = nil, server_selector = nil, context = nil, &block) if session.nil? && server_selector.nil? deprecated_legacy_read_with_retry(&block) elsif session&.retry_reads? - modern_read_with_retry(session, server_selector, &block) + modern_read_with_retry(session, server_selector, context, &block) elsif client.max_read_retries > 0 - legacy_read_with_retry(session, server_selector, &block) + legacy_read_with_retry(session, server_selector, context, &block) else read_without_retry(session, server_selector, &block) end end @@ -184,42 +188,51 @@ # # @param [ Mongo::Session ] session The session that the operation is # being run on. # @param [ Mongo::ServerSelector::Selectable ] server_selector Server # selector for the operation. + # @param [ Mongo::Operation::Context ] context Context for the + # read operation. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def modern_read_with_retry(session, server_selector, &block) - server = select_server(cluster, server_selector, session) + def modern_read_with_retry(session, server_selector, context, &block) + server = select_server( + cluster, + server_selector, + session, + timeout: context&.remaining_timeout_sec + ) yield server - rescue *retryable_exceptions, Error::OperationFailure, Auth::Unauthorized, Error::PoolError => e + rescue *retryable_exceptions, Error::OperationFailure::Family, Auth::Unauthorized, Error::PoolError => e e.add_notes('modern retry', 'attempt 1') raise e if session.in_transaction? raise e if !is_retryable_exception?(e) && !e.write_retryable? - retry_read(e, session, server_selector, failed_server: server, &block) + retry_read(e, session, server_selector, context: context, failed_server: server, &block) end # Attempts to do a "legacy" read with retry. The operation will be # attempted multiple times, up to the client's `max_read_retries` # setting. # # @param [ Mongo::Session ] session The session that the operation is # being run on. # @param [ Mongo::ServerSelector::Selectable ] server_selector Server # selector for the operation. + # @param [ Mongo::Operation::Context | nil ] context Context for the + # read operation. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def legacy_read_with_retry(session, server_selector, &block) + def legacy_read_with_retry(session, server_selector, context = nil, &block) + context&.check_timeout! attempt = attempt ? attempt + 1 : 1 yield select_server(cluster, server_selector, session) - rescue *legacy_retryable_exceptions, Error::OperationFailure => e + rescue *legacy_retryable_exceptions, Error::OperationFailure::Family => e e.add_notes('legacy retry', "attempt #{attempt}") if is_legacy_retryable_exception?(e) - raise e if attempt > client.max_read_retries || session&.in_transaction? elsif e.retryable? && !session&.in_transaction? raise e if attempt > client.max_read_retries else raise e @@ -243,11 +256,11 @@ def read_without_retry(session, server_selector, &block) server = select_server(cluster, server_selector, session) begin yield server - rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure => e + rescue *retryable_exceptions, Error::PoolError, Error::OperationFailure::Family => e e.add_note('retries disabled') raise e end end @@ -257,42 +270,69 @@ # the retry. # @param [ Mongo::Session ] session The session that the operation is # being run on. # @param [ Mongo::ServerSelector::Selectable ] server_selector Server # selector for the operation. - # @param [ Mongo::Server ] failed_server The server on which the original + # @param [ Mongo::Operation::Context | nil ] :context Context for the + # read operation. + # @param [ Mongo::Server | nil ] :failed_server The server on which the original # operation failed. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - def retry_read(original_error, session, server_selector, failed_server: nil, &block) - begin - server = select_server(cluster, server_selector, session, failed_server) - rescue Error, Error::AuthError => e - original_error.add_note("later retry failed: #{e.class}: #{e}") - raise original_error - end + def retry_read(original_error, session, server_selector, context: nil, failed_server: nil, &block) + server = select_server_for_retry( + original_error, session, server_selector, context, failed_server + ) log_retry(original_error, message: 'Read retry') begin + context&.check_timeout! + attempt = attempt ? attempt + 1 : 2 yield server, true + rescue Error::TimeoutError + raise rescue *retryable_exceptions => e - e.add_notes('modern retry', 'attempt 2') - raise e - rescue Error::OperationFailure, Error::PoolError => e + e.add_notes('modern retry', "attempt #{attempt}") + if context&.csot? + failed_server = server + retry + else + raise e + end + rescue Error::OperationFailure::Family, Error::PoolError => e e.add_note('modern retry') - unless e.write_retryable? + if e.write_retryable? + e.add_note("attempt #{attempt}") + if context&.csot? + failed_server = server + retry + else + raise e + end + else original_error.add_note("later retry failed: #{e.class}: #{e}") raise original_error end - e.add_note("attempt 2") - raise e rescue Error, Error::AuthError => e e.add_note('modern retry') original_error.add_note("later retry failed: #{e.class}: #{e}") raise original_error end + end + + def select_server_for_retry(original_error, session, server_selector, context, failed_server) + select_server( + cluster, + server_selector, + session, + failed_server, + timeout: context&.remaining_timeout_sec + ) + rescue Error, Error::AuthError => e + original_error.add_note("later retry failed: #{e.class}: #{e}") + raise original_error end end end end