lib/mongo/retryable.rb in mongo-2.9.2 vs lib/mongo/retryable.rb in mongo-2.10.0.rc0
- old
+ new
@@ -116,11 +116,11 @@
elsif session && session.retry_reads?
modern_read_with_retry(session, server_selector, &block)
elsif client.max_read_retries > 0
legacy_read_with_retry(session, server_selector, &block)
else
- server = select_server(cluster, server_selector)
+ server = select_server(cluster, server_selector, session)
yield server
end
end
# Execute a read operation with a single retry on network errors.
@@ -198,11 +198,11 @@
end
# If we are here, session is not nil. A session being nil would have
# failed retry_write_allowed? check.
- server = cluster.next_primary
+ server = select_server(cluster, ServerSelector.primary, session)
unless ending_transaction || server.retry_writes?
return legacy_write_with_retry(server, session, &block)
end
@@ -211,60 +211,123 @@
yield(server, txn_num, false)
rescue Error::SocketError, Error::SocketTimeoutError => e
if session.in_transaction? && !ending_transaction
raise
end
- retry_write(e, txn_num, &block)
+ retry_write(e, session, txn_num, &block)
rescue Error::OperationFailure => e
if (session.in_transaction? && !ending_transaction) || !e.write_retryable?
raise
end
- retry_write(e, txn_num, &block)
+ retry_write(e, session, txn_num, &block)
end
end
+ # Retryable writes wrapper for operations not supporting modern retryable
+ # writes.
+ #
+ # If the driver is configured to use modern retryable writes, this method
+ # yields to the passed block exactly once, thus not retrying any writes.
+ #
+ # If the driver is configured to use legacy retryable writes, this method
+ # delegates to legacy_write_with_retry which performs write retries using
+ # legacy logic.
+ #
+ # @param [ nil | Session ] session Optional session to use with the operation.
+ # @param [ nil | Hash | WriteConcern::Base ] write_concern The write concern.
+ #
+ # @yieldparam [ Server ] server The server to which the write should be sent.
+ #
+ # @api private
+ def nro_write_with_retry(session, write_concern, &block)
+ if session && session.client.options[:retry_writes]
+ server = select_server(cluster, ServerSelector.primary, session)
+ yield server
+ else
+ legacy_write_with_retry(nil, session, &block)
+ end
+ end
+
+ # Implements legacy write retrying functionality by yielding to the passed
+ # block one or more times.
+ #
+ # This method is used for operations which are not supported by modern
+ # retryable writes, such as delete_many and update_many.
+ #
+ # @param [ Server ] server The server which should be used for the
+ # operation. If not provided, the current primary will be retrieved from
+ # the cluster.
+ # @param [ nil | Session ] session Optional session to use with the operation.
+ #
+ # @yieldparam [ Server ] server The server to which the write should be sent.
+ #
+ # @api private
+ def legacy_write_with_retry(server = nil, session = nil)
+ # This is the pre-session retry logic, and is not subject to
+ # current retryable write specifications.
+ # In particular it does not retry on SocketError and SocketTimeoutError.
+ attempt = 0
+ begin
+ attempt += 1
+ server ||= select_server(cluster, ServerSelector.primary, session)
+ yield server
+ rescue Error::OperationFailure => e
+ server = nil
+ if attempt > client.max_write_retries
+ raise
+ end
+ if e.write_retryable? && !(session && session.in_transaction?)
+ log_retry(e, message: 'Legacy write retry')
+ cluster.scan!(false)
+ retry
+ else
+ raise
+ end
+ end
+ end
+
private
def modern_read_with_retry(session, server_selector, &block)
attempt = 0
- server = select_server(cluster, server_selector)
+ server = select_server(cluster, server_selector, session)
begin
yield server
rescue Error::SocketError, Error::SocketTimeoutError => e
if session.in_transaction?
raise
end
- retry_read(e, server_selector, &block)
+ retry_read(e, server_selector, session, &block)
rescue Error::OperationFailure => e
if session.in_transaction? || !e.write_retryable?
raise
end
- retry_read(e, server_selector, &block)
+ retry_read(e, server_selector, session, &block)
end
end
def legacy_read_with_retry(session, server_selector)
attempt = 0
- server = select_server(cluster, server_selector)
+ server = select_server(cluster, server_selector, session)
begin
attempt += 1
yield server
rescue Error::SocketError, Error::SocketTimeoutError => e
if attempt > client.max_read_retries || (session && session.in_transaction?)
raise
end
log_retry(e, message: 'Legacy read retry')
- server = select_server(cluster, server_selector)
+ server = select_server(cluster, server_selector, session)
retry
rescue Error::OperationFailure => e
if cluster.sharded? && e.retryable? && !(session && session.in_transaction?)
if attempt > client.max_read_retries
raise
end
log_retry(e, message: 'Legacy read retry')
sleep(client.read_retry_interval)
- server = select_server(cluster, server_selector)
+ server = select_server(cluster, server_selector, session)
retry
else
raise
end
end
@@ -283,13 +346,13 @@
end
write_concern.acknowledged?
end
end
- def retry_read(original_error, server_selector, &block)
+ def retry_read(original_error, server_selector, session, &block)
begin
- server = select_server(cluster, server_selector)
+ server = select_server(cluster, server_selector, session)
rescue
raise original_error
end
log_retry(original_error, message: 'Read retry')
@@ -304,17 +367,17 @@
rescue
raise original_error
end
end
- def retry_write(original_error, txn_num, &block)
+ def retry_write(original_error, session, txn_num, &block)
# We do not request a scan of the cluster here, because error handling
# for the error which triggered the retry should have updated the
# server description and/or topology as necessary (specifically,
# a socket error or a not master error should have marked the respective
# server unknown). Here we just need to wait for server selection.
- server = cluster.next_primary
+ server = select_server(cluster, ServerSelector.primary, session)
raise original_error unless (server.retry_writes? && txn_num)
log_retry(original_error, message: 'Write retry')
yield(server, txn_num, true)
rescue Error::SocketError, Error::SocketTimeoutError => e
raise e
@@ -323,36 +386,13 @@
raise e
rescue
raise original_error
end
- def legacy_write_with_retry(server = nil, session = nil)
- # This is the pre-session retry logic, and is not subject to
- # current retryable write specifications.
- # In particular it does not retry on SocketError and SocketTimeoutError.
- attempt = 0
- begin
- attempt += 1
- yield(server || cluster.next_primary)
- rescue Error::OperationFailure => e
- server = nil
- if attempt > client.max_write_retries
- raise
- end
- if e.write_retryable? && !(session && session.in_transaction?)
- log_retry(e, message: 'Legacy write retry')
- cluster.scan!(false)
- retry
- else
- raise
- end
- end
- end
-
# This is a separate method to make it possible for the test suite to
# assert that server selection is performed during retry attempts.
- def select_server(cluster, server_selector)
- server_selector.select_server(cluster)
+ def select_server(cluster, server_selector, session)
+ server_selector.select_server(cluster, nil, session)
end
# Log a warning so that any application slow down is immediately obvious.
def log_retry(e, options = nil)
message = if options && options[:message]