lib/mongo/retryable.rb in mongo-2.9.2 vs lib/mongo/retryable.rb in mongo-2.10.0.rc0

- old
+ new

@@ -116,11 +116,11 @@ elsif session && session.retry_reads? modern_read_with_retry(session, server_selector, &block) elsif client.max_read_retries > 0 legacy_read_with_retry(session, server_selector, &block) else - server = select_server(cluster, server_selector) + server = select_server(cluster, server_selector, session) yield server end end # Execute a read operation with a single retry on network errors. @@ -198,11 +198,11 @@ end # If we are here, session is not nil. A session being nil would have # failed retry_write_allowed? check. - server = cluster.next_primary + server = select_server(cluster, ServerSelector.primary, session) unless ending_transaction || server.retry_writes? return legacy_write_with_retry(server, session, &block) end @@ -211,60 +211,123 @@ yield(server, txn_num, false) rescue Error::SocketError, Error::SocketTimeoutError => e if session.in_transaction? && !ending_transaction raise end - retry_write(e, txn_num, &block) + retry_write(e, session, txn_num, &block) rescue Error::OperationFailure => e if (session.in_transaction? && !ending_transaction) || !e.write_retryable? raise end - retry_write(e, txn_num, &block) + retry_write(e, session, txn_num, &block) end end + # Retryable writes wrapper for operations not supporting modern retryable + # writes. + # + # If the driver is configured to use modern retryable writes, this method + # yields to the passed block exactly once, thus not retrying any writes. + # + # If the driver is configured to use legacy retryable writes, this method + # delegates to legacy_write_with_retry which performs write retries using + # legacy logic. + # + # @param [ nil | Session ] session Optional session to use with the operation. + # @param [ nil | Hash | WriteConcern::Base ] write_concern The write concern. + # + # @yieldparam [ Server ] server The server to which the write should be sent. + # + # @api private + def nro_write_with_retry(session, write_concern, &block) + if session && session.client.options[:retry_writes] + server = select_server(cluster, ServerSelector.primary, session) + yield server + else + legacy_write_with_retry(nil, session, &block) + end + end + + # Implements legacy write retrying functionality by yielding to the passed + # block one or more times. + # + # This method is used for operations which are not supported by modern + # retryable writes, such as delete_many and update_many. + # + # @param [ Server ] server The server which should be used for the + # operation. If not provided, the current primary will be retrieved from + # the cluster. + # @param [ nil | Session ] session Optional session to use with the operation. + # + # @yieldparam [ Server ] server The server to which the write should be sent. + # + # @api private + def legacy_write_with_retry(server = nil, session = nil) + # This is the pre-session retry logic, and is not subject to + # current retryable write specifications. + # In particular it does not retry on SocketError and SocketTimeoutError. + attempt = 0 + begin + attempt += 1 + server ||= select_server(cluster, ServerSelector.primary, session) + yield server + rescue Error::OperationFailure => e + server = nil + if attempt > client.max_write_retries + raise + end + if e.write_retryable? && !(session && session.in_transaction?) + log_retry(e, message: 'Legacy write retry') + cluster.scan!(false) + retry + else + raise + end + end + end + private def modern_read_with_retry(session, server_selector, &block) attempt = 0 - server = select_server(cluster, server_selector) + server = select_server(cluster, server_selector, session) begin yield server rescue Error::SocketError, Error::SocketTimeoutError => e if session.in_transaction? raise end - retry_read(e, server_selector, &block) + retry_read(e, server_selector, session, &block) rescue Error::OperationFailure => e if session.in_transaction? || !e.write_retryable? raise end - retry_read(e, server_selector, &block) + retry_read(e, server_selector, session, &block) end end def legacy_read_with_retry(session, server_selector) attempt = 0 - server = select_server(cluster, server_selector) + server = select_server(cluster, server_selector, session) begin attempt += 1 yield server rescue Error::SocketError, Error::SocketTimeoutError => e if attempt > client.max_read_retries || (session && session.in_transaction?) raise end log_retry(e, message: 'Legacy read retry') - server = select_server(cluster, server_selector) + server = select_server(cluster, server_selector, session) retry rescue Error::OperationFailure => e if cluster.sharded? && e.retryable? && !(session && session.in_transaction?) if attempt > client.max_read_retries raise end log_retry(e, message: 'Legacy read retry') sleep(client.read_retry_interval) - server = select_server(cluster, server_selector) + server = select_server(cluster, server_selector, session) retry else raise end end @@ -283,13 +346,13 @@ end write_concern.acknowledged? end end - def retry_read(original_error, server_selector, &block) + def retry_read(original_error, server_selector, session, &block) begin - server = select_server(cluster, server_selector) + server = select_server(cluster, server_selector, session) rescue raise original_error end log_retry(original_error, message: 'Read retry') @@ -304,17 +367,17 @@ rescue raise original_error end end - def retry_write(original_error, txn_num, &block) + def retry_write(original_error, session, txn_num, &block) # We do not request a scan of the cluster here, because error handling # for the error which triggered the retry should have updated the # server description and/or topology as necessary (specifically, # a socket error or a not master error should have marked the respective # server unknown). Here we just need to wait for server selection. - server = cluster.next_primary + server = select_server(cluster, ServerSelector.primary, session) raise original_error unless (server.retry_writes? && txn_num) log_retry(original_error, message: 'Write retry') yield(server, txn_num, true) rescue Error::SocketError, Error::SocketTimeoutError => e raise e @@ -323,36 +386,13 @@ raise e rescue raise original_error end - def legacy_write_with_retry(server = nil, session = nil) - # This is the pre-session retry logic, and is not subject to - # current retryable write specifications. - # In particular it does not retry on SocketError and SocketTimeoutError. - attempt = 0 - begin - attempt += 1 - yield(server || cluster.next_primary) - rescue Error::OperationFailure => e - server = nil - if attempt > client.max_write_retries - raise - end - if e.write_retryable? && !(session && session.in_transaction?) - log_retry(e, message: 'Legacy write retry') - cluster.scan!(false) - retry - else - raise - end - end - end - # This is a separate method to make it possible for the test suite to # assert that server selection is performed during retry attempts. - def select_server(cluster, server_selector) - server_selector.select_server(cluster) + def select_server(cluster, server_selector, session) + server_selector.select_server(cluster, nil, session) end # Log a warning so that any application slow down is immediately obvious. def log_retry(e, options = nil) message = if options && options[:message]