lib/mongo/retryable.rb in mongo-2.8.0 vs lib/mongo/retryable.rb in mongo-2.9.0.rc0

- old
+ new

@@ -17,55 +17,120 @@ # Defines basic behavior around retrying operations. # # @since 2.1.0 module Retryable - # Execute a read operation with a retry. + # Execute a read operation returning a cursor with retrying. # + # This method performs server selection for the specified server selector + # and yields to the provided block, which should execute the initial + # query operation and return its result. The block will be passed the + # server selected for the operation. If the block raises an exception, + # and this exception corresponds to a read retryable error, and read + # retries are enabled for the client, this method will perform server + # selection again and yield to the block again (with potentially a + # different server). If the block returns successfully, the result + # of the block (which should be a Mongo::Operation::Result) is used to + # construct a Mongo::Cursor object for the result set. The cursor + # is then returned. + # + # If modern retry reads are on (which is the default), the initial read + # operation will be retried once. If legacy retry reads are on, the + # initial read operation will be retried zero or more times depending + # on the :max_read_retries client setting, the default for which is 1. + # To disable read retries, turn off modern read retries by setting + # retry_reads: false and set :max_read_retries to 0 on the client. + # # @api private # - # @example Execute the read. - # read_with_retry do + # @example Execute a read returning a cursor. + # cursor = read_with_retry_cursor(session, server_selector, view) do |server| + # # return a Mongo::Operation::Result # ... # end # - # @note This only retries read operations on socket errors. + # @param [ Mongo::Session ] session The session that the operation is being + # run on. + # @param [ Mongo::ServerSelector::Selectable ] server_selector Server + # selector for the operation. + # @param [ CollectionView ] view The +CollectionView+ defining the query. + # @param [ Proc ] block The block to execute. # - # @param [ Mongo::Session ] session The session that the operation is being run on. + # @return [ Cursor ] The cursor for the result set. + def read_with_retry_cursor(session, server_selector, view, &block) + read_with_retry(session, server_selector) do |server| + result = yield server + Cursor.new(view, result, server, session: session) + end + end + + # Execute a read operation with retrying. + # + # This method performs server selection for the specified server selector + # and yields to the provided block, which should execute the initial + # query operation and return its result. The block will be passed the + # server selected for the operation. If the block raises an exception, + # and this exception corresponds to a read retryable error, and read + # retries are enabled for the client, this method will perform server + # selection again and yield to the block again (with potentially a + # different server). If the block returns successfully, the result + # of the block is returned. + # + # If modern retry reads are on (which is the default), the initial read + # operation will be retried once. If legacy retry reads are on, the + # initial read operation will be retried zero or more times depending + # on the :max_read_retries client setting, the default for which is 1. + # To disable read retries, turn off modern read retries by setting + # retry_reads: false and set :max_read_retries to 0 on the client. + # + # @api private + # + # @example Execute the read. + # read_with_retry(session, server_selector) do |server| + # ... + # end + # + # @param [ Mongo::Session ] session The session that the operation is being + # run on. + # @param [ Mongo::ServerSelector::Selectable ] server_selector Server + # selector for the operation. # @param [ Proc ] block The block to execute. # # @return [ Result ] The result of the operation. - # - # @since 2.1.0 - def read_with_retry(session = nil) - attempt = 0 - begin - attempt += 1 - yield - rescue Error::SocketError, Error::SocketTimeoutError => e - if attempt > cluster.max_read_retries || (session && session.in_transaction?) - raise + def read_with_retry(session = nil, server_selector = nil, &block) + if session.nil? && server_selector.nil? + # Older versions of Mongoid call read_with_retry without arguments. + # This is already not correct in a MongoDB 3.6+ environment with + # sessions. For compatibility we emulate the legacy driver behavior + # here but upgrading Mongoid is strongly recommended. + unless $_mongo_read_with_retry_warned + $_mongo_read_with_retry_warned = true + Logger.logger.warn("Legacy read_with_retry invocation - please update the application and/or its dependencies") end - log_retry(e) - cluster.scan!(false) - retry - rescue Error::OperationFailure => e - if cluster.sharded? && e.retryable? && !(session && session.in_transaction?) - if attempt > cluster.max_read_retries - raise - end - log_retry(e) - sleep(cluster.read_retry_interval) - retry - else - raise - end + # Since we don't have a session, we cannot use the modern read retries. + # And we need to select a server but we don't have a server selector. + # Use PrimaryPreferred which will work as long as there is a data + # bearing node in the cluster; the block may select a different server + # which is fine. + server_selector = ServerSelector.get(mode: :primary_preferred) + legacy_read_with_retry(nil, server_selector, &block) + elsif session && session.retry_reads? + modern_read_with_retry(session, server_selector, &block) + elsif client.max_read_retries > 0 + legacy_read_with_retry(session, server_selector, &block) + else + server = select_server(cluster, server_selector) + yield server end end - # Execute a read operation with a single retry. + # Execute a read operation with a single retry on network errors. # + # This method is used by the driver for some of the internal housekeeping + # operations. Application-requested reads should use read_with_retry + # rather than this method. + # # @api private # # @example Execute the read. # read_with_one_retry do # ... @@ -157,10 +222,56 @@ end end private + def modern_read_with_retry(session, server_selector, &block) + attempt = 0 + server = select_server(cluster, server_selector) + begin + yield server + rescue Error::SocketError, Error::SocketTimeoutError => e + if session.in_transaction? + raise + end + retry_read(e, server_selector, &block) + rescue Error::OperationFailure => e + if session.in_transaction? || !e.write_retryable? + raise + end + retry_read(e, server_selector, &block) + end + end + + def legacy_read_with_retry(session, server_selector) + attempt = 0 + server = select_server(cluster, server_selector) + begin + attempt += 1 + yield server + rescue Error::SocketError, Error::SocketTimeoutError => e + if attempt > client.max_read_retries || (session && session.in_transaction?) + raise + end + log_retry(e, message: 'Legacy read retry') + server = select_server(cluster, server_selector) + retry + rescue Error::OperationFailure => e + if cluster.sharded? && e.retryable? && !(session && session.in_transaction?) + if attempt > client.max_read_retries + raise + end + log_retry(e, message: 'Legacy read retry') + sleep(client.read_retry_interval) + server = select_server(cluster, server_selector) + retry + else + raise + end + end + end + def retry_write_allowed?(session, write_concern) unless session && session.retry_writes? return false end @@ -172,19 +283,40 @@ end write_concern.acknowledged? end end + def retry_read(original_error, server_selector, &block) + begin + server = select_server(cluster, server_selector) + rescue + raise original_error + end + + log_retry(original_error, message: 'Read retry') + + begin + yield server, true + rescue Error::SocketError, Error::SocketTimeoutError => e + raise e + rescue Error::OperationFailure => e + raise original_error unless e.write_retryable? + raise e + rescue + raise original_error + end + end + def retry_write(original_error, txn_num, &block) # We do not request a scan of the cluster here, because error handling # for the error which triggered the retry should have updated the # server description and/or topology as necessary (specifically, # a socket error or a not master error should have marked the respective # server unknown). Here we just need to wait for server selection. server = cluster.next_primary raise original_error unless (server.retry_writes? && txn_num) - log_retry(original_error) + log_retry(original_error, message: 'Write retry') yield(server, txn_num, true) rescue Error::SocketError, Error::SocketTimeoutError => e raise e rescue Error::OperationFailure => e raise original_error unless e.write_retryable? @@ -201,20 +333,26 @@ begin attempt += 1 yield(server || cluster.next_primary) rescue Error::OperationFailure => e server = nil - if attempt > Cluster::MAX_WRITE_RETRIES + if attempt > client.max_write_retries raise end if e.write_retryable? && !(session && session.in_transaction?) - log_retry(e) + log_retry(e, message: 'Legacy write retry') cluster.scan!(false) retry else raise end end + end + + # This is a separate method to make it possible for the test suite to + # assert that server selection is performed during retry attempts. + def select_server(cluster, server_selector) + server_selector.select_server(cluster) end # Log a warning so that any application slow down is immediately obvious. def log_retry(e, options = nil) message = if options && options[:message]