lib/mongo/operation/shared/executable.rb in mongo-2.19.3 vs lib/mongo/operation/shared/executable.rb in mongo-2.20.0
- old
+ new
@@ -13,10 +13,12 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+require 'mongo/error'
+
module Mongo
module Operation
# Shared executable behavior of operations.
#
@@ -28,44 +30,46 @@
def do_execute(connection, context, options = {})
session&.materialize_if_needed
unpin_maybe(session, connection) do
add_error_labels(connection, context) do
- add_server_diagnostics(connection) do
- get_result(connection, context, options).tap do |result|
- if session
- if session.in_transaction? &&
+ check_for_network_error do
+ add_server_diagnostics(connection) do
+ get_result(connection, context, options).tap do |result|
+ if session
+ if session.in_transaction? &&
+ connection.description.load_balancer?
+ then
+ if session.pinned_connection_global_id
+ unless session.pinned_connection_global_id == connection.global_id
+ raise(
+ Error::InternalDriverError,
+ "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
+ )
+ end
+ else
+ session.pin_to_connection(connection.global_id)
+ connection.pin
+ end
+ end
+
+ if session.snapshot? && !session.snapshot_timestamp
+ session.snapshot_timestamp = result.snapshot_timestamp
+ end
+ end
+
+ if result.has_cursor_id? &&
connection.description.load_balancer?
then
- if session.pinned_connection_global_id
- unless session.pinned_connection_global_id == connection.global_id
- raise(
- Error::InternalDriverError,
- "Expected operation to use connection #{session.pinned_connection_global_id} but it used #{connection.global_id}"
- )
- end
+ if result.cursor_id == 0
+ connection.unpin
else
- session.pin_to_connection(connection.global_id)
connection.pin
end
end
-
- if session.snapshot? && !session.snapshot_timestamp
- session.snapshot_timestamp = result.snapshot_timestamp
- end
+ process_result(result, connection)
end
-
- if result.has_cursor_id? &&
- connection.description.load_balancer?
- then
- if result.cursor_id == 0
- connection.unpin
- else
- connection.pin
- end
- end
- process_result(result, connection)
end
end
end
end
end
@@ -141,9 +145,21 @@
topology_version: result.topology_version,
)
connection.server.scan_semaphore.signal
end
+ end
+
+ NETWORK_ERRORS = [
+ Error::SocketError,
+ Error::SocketTimeoutError
+ ].freeze
+
+ def check_for_network_error
+ yield
+ rescue *NETWORK_ERRORS
+ session&.dirty!
+ raise
end
end
end
end