lib/mongo/operation/shared/executable.rb in mongo-2.19.3 vs lib/mongo/operation/shared/executable.rb in mongo-2.20.0

- old
+ new

@@ -13,10 +13,12 @@ # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +require 'mongo/error' + module Mongo module Operation # Shared executable behavior of operations. # @@ -28,44 +30,46 @@ def do_execute(connection, context, options = {}) session&.materialize_if_needed unpin_maybe(session, connection) do add_error_labels(connection, context) do - add_server_diagnostics(connection) do - get_result(connection, context, options).tap do |result| - if session - if session.in_transaction? && + check_for_network_error do + add_server_diagnostics(connection) do + get_result(connection, context, options).tap do |result| + if session + if session.in_transaction? && + connection.description.load_balancer? + then + if session.pinned_connection_global_id + unless session.pinned_connection_global_id == connection.global_id + raise( + Error::InternalDriverError, + "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}" + ) + end + else + session.pin_to_connection(connection.global_id) + connection.pin + end + end + + if session.snapshot? && !session.snapshot_timestamp + session.snapshot_timestamp = result.snapshot_timestamp + end + end + + if result.has_cursor_id? && connection.description.load_balancer? then - if session.pinned_connection_global_id - unless session.pinned_connection_global_id == connection.global_id - raise( - Error::InternalDriverError, - "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}" - ) - end + if result.cursor_id == 0 + connection.unpin else - session.pin_to_connection(connection.global_id) connection.pin end end - - if session.snapshot? && !session.snapshot_timestamp - session.snapshot_timestamp = result.snapshot_timestamp - end + process_result(result, connection) end - - if result.has_cursor_id? && - connection.description.load_balancer? - then - if result.cursor_id == 0 - connection.unpin - else - connection.pin - end - end - process_result(result, connection) end end end end end @@ -141,9 +145,21 @@ topology_version: result.topology_version, ) connection.server.scan_semaphore.signal end + end + + NETWORK_ERRORS = [ + Error::SocketError, + Error::SocketTimeoutError + ].freeze + + def check_for_network_error + yield + rescue *NETWORK_ERRORS + session&.dirty! + raise end end end end