lib/mongo/retryable.rb in mongo-2.18.3 vs lib/mongo/retryable.rb in mongo-2.19.0
- old
+ new
@@ -1,7 +1,7 @@
# frozen_string_literal: true
-# encoding: utf-8
+# rubocop:todo all
# Copyright (C) 2015-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,532 +13,61 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'mongo/retryable/read_worker'
+require 'mongo/retryable/write_worker'
+
module Mongo
# Defines basic behavior around retrying operations.
#
# @since 2.1.0
module Retryable
+ extend Forwardable
- # Execute a read operation returning a cursor with retrying.
- #
- # This method performs server selection for the specified server selector
- # and yields to the provided block, which should execute the initial
- # query operation and return its result. The block will be passed the
- # server selected for the operation. If the block raises an exception,
- # and this exception corresponds to a read retryable error, and read
- # retries are enabled for the client, this method will perform server
- # selection again and yield to the block again (with potentially a
- # different server). If the block returns successfully, the result
- # of the block (which should be a Mongo::Operation::Result) is used to
- # construct a Mongo::Cursor object for the result set. The cursor
- # is then returned.
- #
- # If modern retry reads are on (which is the default), the initial read
- # operation will be retried once. If legacy retry reads are on, the
- # initial read operation will be retried zero or more times depending
- # on the :max_read_retries client setting, the default for which is 1.
- # To disable read retries, turn off modern read retries by setting
- # retry_reads: false and set :max_read_retries to 0 on the client.
- #
- # @api private
- #
- # @example Execute a read returning a cursor.
- # cursor = read_with_retry_cursor(session, server_selector, view) do |server|
- # # return a Mongo::Operation::Result
- # ...
- # end
- #
- # @param [ Mongo::Session ] session The session that the operation is being
- # run on.
- # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
- # selector for the operation.
- # @param [ CollectionView ] view The +CollectionView+ defining the query.
- # @param [ Proc ] block The block to execute.
- #
- # @return [ Cursor ] The cursor for the result set.
- def read_with_retry_cursor(session, server_selector, view, &block)
- read_with_retry(session, server_selector) do |server|
- result = yield server
+ # Delegate the public read_with_retry methods to the read_worker
+ def_delegators :read_worker,
+ :read_with_retry_cursor,
+ :read_with_retry,
+ :read_with_one_retry
- # RUBY-2367: This will be updated to allow the query cache to
- # cache cursors with multi-batch results.
- if QueryCache.enabled? && !view.collection.system_collection?
- CachingCursor.new(view, result, server, session: session)
- else
- Cursor.new(view, result, server, session: session)
- end
- end
- end
+ # Delegate the public write_with_retry methods to the write_worker
+ def_delegators :write_worker,
+ :write_with_retry,
+ :nro_write_with_retry
- # Execute a read operation with retrying.
+ # This is a separate method to make it possible for the test suite to
+ # assert that server selection is performed during retry attempts.
#
- # This method performs server selection for the specified server selector
- # and yields to the provided block, which should execute the initial
- # query operation and return its result. The block will be passed the
- # server selected for the operation. If the block raises an exception,
- # and this exception corresponds to a read retryable error, and read
- # retries are enabled for the client, this method will perform server
- # selection again and yield to the block again (with potentially a
- # different server). If the block returns successfully, the result
- # of the block is returned.
+ # This is a public method so that it can be accessed via the read and
+ # write worker delegates, as needed.
#
- # If modern retry reads are on (which is the default), the initial read
- # operation will be retried once. If legacy retry reads are on, the
- # initial read operation will be retried zero or more times depending
- # on the :max_read_retries client setting, the default for which is 1.
- # To disable read retries, turn off modern read retries by setting
- # retry_reads: false and set :max_read_retries to 0 on the client.
- #
# @api private
#
- # @example Execute the read.
- # read_with_retry(session, server_selector) do |server|
- # ...
- # end
- #
- # @param [ Mongo::Session ] session The session that the operation is being
- # run on.
- # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
- # selector for the operation.
- # @param [ Proc ] block The block to execute.
- #
- # @return [ Result ] The result of the operation.
- def read_with_retry(session = nil, server_selector = nil, &block)
- if session.nil? && server_selector.nil?
- # Older versions of Mongoid call read_with_retry without arguments.
- # This is already not correct in a MongoDB 3.6+ environment with
- # sessions. For compatibility we emulate the legacy driver behavior
- # here but upgrading Mongoid is strongly recommended.
- unless $_mongo_read_with_retry_warned
- $_mongo_read_with_retry_warned = true
- Logger.logger.warn("Legacy read_with_retry invocation - please update the application and/or its dependencies")
- end
- # Since we don't have a session, we cannot use the modern read retries.
- # And we need to select a server but we don't have a server selector.
- # Use PrimaryPreferred which will work as long as there is a data
- # bearing node in the cluster; the block may select a different server
- # which is fine.
- server_selector = ServerSelector.get(mode: :primary_preferred)
- legacy_read_with_retry(nil, server_selector, &block)
- elsif session && session.retry_reads?
- modern_read_with_retry(session, server_selector, &block)
- elsif client.max_read_retries > 0
- legacy_read_with_retry(session, server_selector, &block)
- else
- server = select_server(cluster, server_selector, session)
- begin
- yield server
- rescue Error::SocketError, Error::SocketTimeoutError, Error::OperationFailure => e
- e.add_note('retries disabled')
- raise e
- end
- end
+ # @return [ Mongo::Server ] A server matching the server preference.
+ def select_server(cluster, server_selector, session)
+ server_selector.select_server(cluster, nil, session)
end
- # Execute a read operation with a single retry on network errors.
+ # Returns the read worker for handling retryable reads.
#
- # This method is used by the driver for some of the internal housekeeping
- # operations. Application-requested reads should use read_with_retry
- # rather than this method.
- #
# @api private
#
- # @example Execute the read.
- # read_with_one_retry do
- # ...
- # end
- #
- # @note This only retries read operations on socket errors.
- #
- # @param [ Hash ] options Options.
- # @yield Calls the provided block with no arguments
- #
- # @option options [ String ] :retry_message Message to log when retrying.
- #
- # @return [ Result ] The result of the operation.
- #
- # @since 2.2.6
- def read_with_one_retry(options = nil)
- yield
- rescue Error::SocketError, Error::SocketTimeoutError => e
- retry_message = options && options[:retry_message]
- log_retry(e, message: retry_message)
- yield
+ # @note this is only a public method so that tests can add expectations
+ # based on it.
+ def read_worker
+ @read_worker ||= ReadWorker.new(self)
end
- # Implements write retrying functionality by yielding to the passed
- # block one or more times.
+ # Returns the write worker for handling retryable writes.
#
- # If the session is provided (hence, the deployment supports sessions),
- # and modern retry writes are enabled on the client, the modern retry
- # logic is invoked. Otherwise the legacy retry logic is invoked.
- #
- # If ending_transaction parameter is true, indicating that a transaction
- # is being committed or aborted, the operation is executed exactly once.
- # Note that, since transactions require sessions, this method will raise
- # ArgumentError if ending_transaction is true and session is nil.
- #
# @api private
#
- # @example Execute the write.
- # write_with_retry do
- # ...
- # end
- #
- # @note This only retries operations on not master failures, since it is
- # the only case we can be sure a partial write did not already occur.
- #
- # @param [ nil | Hash | WriteConcern::Base ] write_concern The write concern.
- # @param [ true | false ] ending_transaction True if the write operation is
- # abortTransaction or commitTransaction, false otherwise.
- # @param [ Context ] context The context for the operation.
- # @param [ Proc ] block The block to execute.
- #
- # @yieldparam [ Connection ] connection The connection through which the
- # write should be sent.
- # @yieldparam [ Integer ] txn_num Transaction number (NOT the ACID kind).
- # @yieldparam [ Operation::Context ] context The operation context.
- #
- # @return [ Result ] The result of the operation.
- #
- # @since 2.1.0
- def write_with_retry(write_concern, ending_transaction: false, context:, &block)
- session = context.session
-
- if ending_transaction && !session
- raise ArgumentError, 'Cannot end a transaction without a session'
- end
-
- unless ending_transaction || retry_write_allowed?(session, write_concern)
- return legacy_write_with_retry(nil, context: context, &block)
- end
-
- # If we are here, session is not nil. A session being nil would have
- # failed retry_write_allowed? check.
-
- server = select_server(cluster, ServerSelector.primary, session)
-
- unless ending_transaction || server.retry_writes?
- return legacy_write_with_retry(server, context: context, &block)
- end
-
- txn_num = nil
-
- begin
- connection_succeeded = false
- server.with_connection(connection_global_id: context.connection_global_id) do |connection|
- connection_succeeded = true
-
- session.materialize_if_needed
- txn_num = if session.in_transaction?
- session.txn_num
- else
- session.next_txn_num
- end
-
- # The context needs to be duplicated here because we will be using
- # it later for the retry as well.
- yield(connection, txn_num, context.dup)
- end
- rescue Error::SocketError, Error::SocketTimeoutError, Auth::Unauthorized => e
- e.add_note('modern retry')
- e.add_note("attempt 1")
- if !e.label?('RetryableWriteError')
- # If we get an auth error, it was raised when connecting the connection
- # and therefore we didn't have the connection yet to add labels.
- # Therefore, check if it is retryable, and if it is, add the label
- # and retry it. We also want to retry this if there was a Socket error
- # when trying to create the connection.
- if !connection_succeeded && !session.in_transaction? && e.write_retryable?
- e.add_label('RetryableWriteError')
- else
- raise e
- end
- end
-
- # Context#with creates a new context, which is not necessary here
- # but the API is less prone to misuse this way.
- retry_write(e, txn_num, context: context.with(is_retry: true), &block)
- rescue Error::OperationFailure => e
- e.add_note('modern retry')
- e.add_note("attempt 1")
- if e.unsupported_retryable_write?
- raise_unsupported_error(e)
- elsif !e.label?('RetryableWriteError')
- raise e
- end
-
- # Context#with creates a new context, which is not necessary here
- # but the API is less prone to misuse this way.
- retry_write(e, txn_num, context: context.with(is_retry: true), &block)
- end
- end
-
- # Retryable writes wrapper for operations not supporting modern retryable
- # writes.
- #
- # If the driver is configured to use modern retryable writes, this method
- # yields to the passed block exactly once, thus not retrying any writes.
- #
- # If the driver is configured to use legacy retryable writes, this method
- # delegates to legacy_write_with_retry which performs write retries using
- # legacy logic.
- #
- # @param [ nil | Hash | WriteConcern::Base ] write_concern The write concern.
- # @param [ Context ] context The context for the operation.
- #
- # @yieldparam [ Connection ] connection The connection through which the
- # write should be sent.
- # @yieldparam [ nil ] txn_num nil as transaction number.
- # @yieldparam [ Operation::Context ] context The operation context.
- #
- # @api private
- def nro_write_with_retry(write_concern, context:, &block)
- session = context.session
-
- server = select_server(cluster, ServerSelector.primary, session)
- if session && session.client.options[:retry_writes]
- begin
- server.with_connection(connection_global_id: context.connection_global_id) do |connection|
- yield connection, nil, context
- end
- rescue Error::SocketError, Error::SocketTimeoutError, Error::OperationFailure => e
- e.add_note('retries disabled')
- raise e
- end
- else
- legacy_write_with_retry(server, context: context, &block)
- end
- end
-
- # Implements legacy write retrying functionality by yielding to the passed
- # block one or more times.
- #
- # This method is used for operations which are not supported by modern
- # retryable writes, such as delete_many and update_many.
- #
- # @param [ Server ] server The server which should be used for the
- # operation. If not provided, the current primary will be retrieved from
- # the cluster.
- # @param [ Context ] context The context for the operation.
- #
- # @yieldparam [ Connection ] connection The connection through which the
- # write should be sent.
- # @yieldparam [ nil ] txn_num nil as transaction number.
- # @yieldparam [ Operation::Context ] context The operation context.
- #
- # @api private
- def legacy_write_with_retry(server = nil, context:)
- session = context.session
-
- # This is the pre-session retry logic, and is not subject to
- # current retryable write specifications.
- # In particular it does not retry on SocketError and SocketTimeoutError.
- attempt = 0
- begin
- attempt += 1
- server ||= select_server(cluster, ServerSelector.primary, session)
- server.with_connection(connection_global_id: context.connection_global_id) do |connection|
- # Legacy retries do not use txn_num
- yield connection, nil, context.dup
- end
- rescue Error::OperationFailure => e
- e.add_note('legacy retry')
- e.add_note("attempt #{attempt}")
- server = nil
- if attempt > client.max_write_retries
- raise e
- end
- if e.label?('RetryableWriteError')
- log_retry(e, message: 'Legacy write retry')
- cluster.scan!(false)
- retry
- else
- raise e
- end
- end
- end
-
- private
-
- def modern_read_with_retry(session, server_selector, &block)
- server = select_server(cluster, server_selector, session)
- begin
- yield server
- rescue Error::SocketError, Error::SocketTimeoutError => e
- e.add_note('modern retry')
- e.add_note("attempt 1")
- if session.in_transaction?
- raise e
- end
- retry_read(e, server_selector, session, &block)
- rescue Error::OperationFailure, Auth::Unauthorized => e
- e.add_note('modern retry')
- e.add_note("attempt 1")
- if session.in_transaction? || !e.write_retryable?
- raise e
- end
- retry_read(e, server_selector, session, &block)
- end
- end
-
- def legacy_read_with_retry(session, server_selector)
- attempt = 0
- server = select_server(cluster, server_selector, session)
- begin
- attempt += 1
- yield server
- rescue Error::SocketError, Error::SocketTimeoutError => e
- e.add_note('legacy retry')
- e.add_note("attempt #{attempt}")
- if attempt > client.max_read_retries || (session && session.in_transaction?)
- raise e
- end
- log_retry(e, message: 'Legacy read retry')
- server = select_server(cluster, server_selector, session)
- retry
- rescue Error::OperationFailure => e
- e.add_note('legacy retry')
- e.add_note("attempt #{attempt}")
- if e.retryable? && !(session && session.in_transaction?)
- if attempt > client.max_read_retries
- raise e
- end
- log_retry(e, message: 'Legacy read retry')
- sleep(client.read_retry_interval)
- server = select_server(cluster, server_selector, session)
- retry
- else
- raise e
- end
- end
- end
-
- def retry_write_allowed?(session, write_concern)
- unless session && session.retry_writes?
- return false
- end
-
- if write_concern.nil?
- true
- else
- unless write_concern.is_a?(WriteConcern::Base)
- write_concern = WriteConcern.get(write_concern)
- end
- write_concern.acknowledged?
- end
- end
-
- def retry_read(original_error, server_selector, session, &block)
- begin
- server = select_server(cluster, server_selector, session)
- rescue Error, Error::AuthError => e
- original_error.add_note("later retry failed: #{e.class}: #{e}")
-
- # See the corresponding note below in retry_write.
- raise Error::RaiseOriginalError
- end
-
- log_retry(original_error, message: 'Read retry')
-
- begin
- yield server, true
- rescue Error::SocketError, Error::SocketTimeoutError => e
- e.add_note('modern retry')
- e.add_note("attempt 2")
- raise e
- rescue Error::OperationFailure => e
- e.add_note('modern retry')
- unless e.write_retryable?
- original_error.add_note("later retry failed: #{e.class}: #{e}")
- raise original_error
- end
- e.add_note("attempt 2")
- raise e
- rescue Error, Error::AuthError => e
- e.add_note('modern retry')
- original_error.add_note("later retry failed: #{e.class}: #{e}")
- raise original_error
- end
- rescue Error::RaiseOriginalError
- raise original_error
- end
-
- def retry_write(original_error, txn_num, context:, &block)
- session = context.session
-
- # We do not request a scan of the cluster here, because error handling
- # for the error which triggered the retry should have updated the
- # server description and/or topology as necessary (specifically,
- # a socket error or a not master error should have marked the respective
- # server unknown). Here we just need to wait for server selection.
- server = select_server(cluster, ServerSelector.primary, session)
- unless server.retry_writes?
- # Do not need to add "modern retry" here, it should already be on
- # the first exception.
- original_error.add_note('did not retry because server selected for retry does not supoprt retryable writes')
-
- # When we want to raise the original error, we must not run the
- # rescue blocks below that add diagnostics because the diagnostics
- # added would either be rendundant (e.g. modern retry note) or wrong
- # (e.g. "attempt 2", we are raising the exception produced in the
- # first attempt and haven't attempted the second time). Use the
- # special marker class to bypass the ordinarily applicable rescues.
- raise Error::RaiseOriginalError
- end
- log_retry(original_error, message: 'Write retry')
- server.with_connection(connection_global_id: context.connection_global_id) do |connection|
- yield(connection, txn_num, context)
- end
- rescue Error::SocketError, Error::SocketTimeoutError => e
- e.add_note('modern retry')
- e.add_note('attempt 2')
- raise e
- rescue Error::OperationFailure => e
- e.add_note('modern retry')
- if e.label?('RetryableWriteError')
- e.add_note('attempt 2')
- raise e
- else
- original_error.add_note("later retry failed: #{e.class}: #{e}")
- raise original_error
- end
- rescue Error, Error::AuthError => e
- # Do not need to add "modern retry" here, it should already be on
- # the first exception.
- original_error.add_note("later retry failed: #{e.class}: #{e}")
- raise original_error
- rescue Error::RaiseOriginalError
- raise original_error
- end
-
- # This is a separate method to make it possible for the test suite to
- # assert that server selection is performed during retry attempts.
- def select_server(cluster, server_selector, session)
- server_selector.select_server(cluster, nil, session)
- end
-
- # Log a warning so that any application slow down is immediately obvious.
- def log_retry(e, options = nil)
- message = if options && options[:message]
- options[:message]
- else
- "Retry"
- end
- Logger.logger.warn "#{message} due to: #{e.class.name}: #{e.message}"
- end
-
- # Retry writes on MMAPv1 should raise an actionable error; append actionable
- # information to the error message and preserve the backtrace.
- def raise_unsupported_error(e)
- new_error = Error::OperationFailure.new("#{e.class}: #{e} "\
- "This MongoDB deployment does not support retryable writes. Please add "\
- "retryWrites=false to your connection string or use the retry_writes: false Ruby client option")
- new_error.set_backtrace(e.backtrace)
- raise new_error
+ # @note this is only a public method so that tests can add expectations
+ # based on it.
+ def write_worker
+ @write_worker ||= WriteWorker.new(self)
end
end
end