lib/mongo/retryable.rb in mongo-2.8.0 vs lib/mongo/retryable.rb in mongo-2.9.0.rc0
- old
+ new
@@ -17,55 +17,120 @@
# Defines basic behavior around retrying operations.
#
# @since 2.1.0
module Retryable
- # Execute a read operation with a retry.
+ # Execute a read operation returning a cursor with retrying.
#
+ # This method performs server selection for the specified server selector
+ # and yields to the provided block, which should execute the initial
+ # query operation and return its result. The block will be passed the
+ # server selected for the operation. If the block raises an exception,
+ # and this exception corresponds to a read retryable error, and read
+ # retries are enabled for the client, this method will perform server
+ # selection again and yield to the block again (with potentially a
+ # different server). If the block returns successfully, the result
+ # of the block (which should be a Mongo::Operation::Result) is used to
+ # construct a Mongo::Cursor object for the result set. The cursor
+ # is then returned.
+ #
+ # If modern retry reads are on (which is the default), the initial read
+ # operation will be retried once. If legacy retry reads are on, the
+ # initial read operation will be retried zero or more times depending
+ # on the :max_read_retries client setting, the default for which is 1.
+ # To disable read retries, turn off modern read retries by setting
+ # retry_reads: false and set :max_read_retries to 0 on the client.
+ #
# @api private
#
- # @example Execute the read.
- # read_with_retry do
+ # @example Execute a read returning a cursor.
+ # cursor = read_with_retry_cursor(session, server_selector, view) do |server|
+ # # return a Mongo::Operation::Result
# ...
# end
#
- # @note This only retries read operations on socket errors.
+ # @param [ Mongo::Session ] session The session that the operation is being
+ # run on.
+ # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
+ # selector for the operation.
+ # @param [ CollectionView ] view The +CollectionView+ defining the query.
+ # @param [ Proc ] block The block to execute.
#
- # @param [ Mongo::Session ] session The session that the operation is being run on.
+ # @return [ Cursor ] The cursor for the result set.
+ def read_with_retry_cursor(session, server_selector, view, &block)
+ read_with_retry(session, server_selector) do |server|
+ result = yield server
+ Cursor.new(view, result, server, session: session)
+ end
+ end
+
+ # Execute a read operation with retrying.
+ #
+ # This method performs server selection for the specified server selector
+ # and yields to the provided block, which should execute the initial
+ # query operation and return its result. The block will be passed the
+ # server selected for the operation. If the block raises an exception,
+ # and this exception corresponds to a read retryable error, and read
+ # retries are enabled for the client, this method will perform server
+ # selection again and yield to the block again (with potentially a
+ # different server). If the block returns successfully, the result
+ # of the block is returned.
+ #
+ # If modern retry reads are on (which is the default), the initial read
+ # operation will be retried once. If legacy retry reads are on, the
+ # initial read operation will be retried zero or more times depending
+ # on the :max_read_retries client setting, the default for which is 1.
+ # To disable read retries, turn off modern read retries by setting
+ # retry_reads: false and set :max_read_retries to 0 on the client.
+ #
+ # @api private
+ #
+ # @example Execute the read.
+ # read_with_retry(session, server_selector) do |server|
+ # ...
+ # end
+ #
+ # @param [ Mongo::Session ] session The session that the operation is being
+ # run on.
+ # @param [ Mongo::ServerSelector::Selectable ] server_selector Server
+ # selector for the operation.
# @param [ Proc ] block The block to execute.
#
# @return [ Result ] The result of the operation.
- #
- # @since 2.1.0
- def read_with_retry(session = nil)
- attempt = 0
- begin
- attempt += 1
- yield
- rescue Error::SocketError, Error::SocketTimeoutError => e
- if attempt > cluster.max_read_retries || (session && session.in_transaction?)
- raise
+ def read_with_retry(session = nil, server_selector = nil, &block)
+ if session.nil? && server_selector.nil?
+ # Older versions of Mongoid call read_with_retry without arguments.
+ # This is already not correct in a MongoDB 3.6+ environment with
+ # sessions. For compatibility we emulate the legacy driver behavior
+ # here but upgrading Mongoid is strongly recommended.
+ unless $_mongo_read_with_retry_warned
+ $_mongo_read_with_retry_warned = true
+ Logger.logger.warn("Legacy read_with_retry invocation - please update the application and/or its dependencies")
end
- log_retry(e)
- cluster.scan!(false)
- retry
- rescue Error::OperationFailure => e
- if cluster.sharded? && e.retryable? && !(session && session.in_transaction?)
- if attempt > cluster.max_read_retries
- raise
- end
- log_retry(e)
- sleep(cluster.read_retry_interval)
- retry
- else
- raise
- end
+ # Since we don't have a session, we cannot use the modern read retries.
+ # And we need to select a server but we don't have a server selector.
+ # Use PrimaryPreferred which will work as long as there is a data
+ # bearing node in the cluster; the block may select a different server
+ # which is fine.
+ server_selector = ServerSelector.get(mode: :primary_preferred)
+ legacy_read_with_retry(nil, server_selector, &block)
+ elsif session && session.retry_reads?
+ modern_read_with_retry(session, server_selector, &block)
+ elsif client.max_read_retries > 0
+ legacy_read_with_retry(session, server_selector, &block)
+ else
+ server = select_server(cluster, server_selector)
+ yield server
end
end
- # Execute a read operation with a single retry.
+ # Execute a read operation with a single retry on network errors.
#
+ # This method is used by the driver for some of the internal housekeeping
+ # operations. Application-requested reads should use read_with_retry
+ # rather than this method.
+ #
# @api private
#
# @example Execute the read.
# read_with_one_retry do
# ...
@@ -157,10 +222,56 @@
end
end
private
+ def modern_read_with_retry(session, server_selector, &block)
+ attempt = 0
+ server = select_server(cluster, server_selector)
+ begin
+ yield server
+ rescue Error::SocketError, Error::SocketTimeoutError => e
+ if session.in_transaction?
+ raise
+ end
+ retry_read(e, server_selector, &block)
+ rescue Error::OperationFailure => e
+ if session.in_transaction? || !e.write_retryable?
+ raise
+ end
+ retry_read(e, server_selector, &block)
+ end
+ end
+
+ def legacy_read_with_retry(session, server_selector)
+ attempt = 0
+ server = select_server(cluster, server_selector)
+ begin
+ attempt += 1
+ yield server
+ rescue Error::SocketError, Error::SocketTimeoutError => e
+ if attempt > client.max_read_retries || (session && session.in_transaction?)
+ raise
+ end
+ log_retry(e, message: 'Legacy read retry')
+ server = select_server(cluster, server_selector)
+ retry
+ rescue Error::OperationFailure => e
+ if cluster.sharded? && e.retryable? && !(session && session.in_transaction?)
+ if attempt > client.max_read_retries
+ raise
+ end
+ log_retry(e, message: 'Legacy read retry')
+ sleep(client.read_retry_interval)
+ server = select_server(cluster, server_selector)
+ retry
+ else
+ raise
+ end
+ end
+ end
+
def retry_write_allowed?(session, write_concern)
unless session && session.retry_writes?
return false
end
@@ -172,19 +283,40 @@
end
write_concern.acknowledged?
end
end
+ def retry_read(original_error, server_selector, &block)
+ begin
+ server = select_server(cluster, server_selector)
+ rescue
+ raise original_error
+ end
+
+ log_retry(original_error, message: 'Read retry')
+
+ begin
+ yield server, true
+ rescue Error::SocketError, Error::SocketTimeoutError => e
+ raise e
+ rescue Error::OperationFailure => e
+ raise original_error unless e.write_retryable?
+ raise e
+ rescue
+ raise original_error
+ end
+ end
+
def retry_write(original_error, txn_num, &block)
# We do not request a scan of the cluster here, because error handling
# for the error which triggered the retry should have updated the
# server description and/or topology as necessary (specifically,
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
server = cluster.next_primary
raise original_error unless (server.retry_writes? && txn_num)
- log_retry(original_error)
+ log_retry(original_error, message: 'Write retry')
yield(server, txn_num, true)
rescue Error::SocketError, Error::SocketTimeoutError => e
raise e
rescue Error::OperationFailure => e
raise original_error unless e.write_retryable?
@@ -201,20 +333,26 @@
begin
attempt += 1
yield(server || cluster.next_primary)
rescue Error::OperationFailure => e
server = nil
- if attempt > Cluster::MAX_WRITE_RETRIES
+ if attempt > client.max_write_retries
raise
end
if e.write_retryable? && !(session && session.in_transaction?)
- log_retry(e)
+ log_retry(e, message: 'Legacy write retry')
cluster.scan!(false)
retry
else
raise
end
end
+ end
+
+ # This is a separate method to make it possible for the test suite to
+ # assert that server selection is performed during retry attempts.
+ def select_server(cluster, server_selector)
+ server_selector.select_server(cluster)
end
# Log a warning so that any application slow down is immediately obvious.
def log_retry(e, options = nil)
message = if options && options[:message]