lib/mongo/operation/shared/executable.rb in mongo-2.12.4 vs lib/mongo/operation/shared/executable.rb in mongo-2.13.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-# Copyright (C) 2015-2019 MongoDB, Inc.
+# Copyright (C) 2015-2020 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
@@ -20,68 +20,86 @@
# @since 2.5.2
module Executable
include ResponseHandling
- def do_execute(server, client, options = {})
+ def do_execute(connection, client, options = {})
unpin_maybe(session) do
- add_error_labels do
- add_server_diagnostics(server) do
- get_result(server, client, options).tap do |result|
- process_result(result, server)
+ add_error_labels(client, connection, session) do
+ add_server_diagnostics(connection) do
+ get_result(connection, client, options).tap do |result|
+ process_result(result, connection)
end
end
end
end
end
- def execute(server, client:, options: {})
- do_execute(server, client, options).tap do |result|
- validate_result(result, server)
+ def execute(connection, client:, options: {})
+ if Lint.enabled?
+ unless connection.is_a?(Mongo::Server::Connection)
+ raise Error::LintError, "Connection argument is of wrong type: #{connection}"
+ end
end
+
+ do_execute(connection, client, options).tap do |result|
+ validate_result(result, client, connection)
+ end
end
private
def result_class
Result
end
- def get_result(server, client, options = {})
- result_class.new(dispatch_message(server, client, options))
+ def get_result(connection, client, options = {})
+ result_class.new(*dispatch_message(connection, client, options))
end
# Returns a Protocol::Message or nil as reply.
- def dispatch_message(server, client, options = {})
- server.with_connection do |connection|
- message = build_message(server)
- message = message.maybe_encrypt(server, client)
- connection.dispatch([ message ], operation_id, client, options)
- end
+ def dispatch_message(connection, client, options = {})
+ message = build_message(connection)
+ message = message.maybe_encrypt(connection, client)
+ reply = connection.dispatch([ message ], operation_id, client, options)
+ [reply, connection.description]
end
- def build_message(server)
- message(server)
+ def build_message(connection)
+ message(connection)
end
- def process_result(result, server)
- server.update_cluster_time(result)
+ def process_result(result, connection)
+ connection.server.update_cluster_time(result)
- if result.not_master? || result.node_recovering?
+ process_result_for_sdam(result, connection)
+
+ if session
+ session.process(result)
+ end
+
+ result
+ end
+
+ def process_result_for_sdam(result, connection)
+ if (result.not_master? || result.node_recovering?) &&
+ connection.generation >= connection.server.pool.generation
+ then
if result.node_shutting_down?
keep_pool = false
else
# Max wire version needs to be examined while the server is known
- keep_pool = server.description.server_version_gte?('4.2')
+ keep_pool = connection.description.server_version_gte?('4.2')
end
- server.unknown!(keep_connection_pool: keep_pool)
+ connection.server.unknown!(
+ keep_connection_pool: keep_pool,
+ generation: connection.generation,
+ topology_version: result.topology_version,
+ )
- server.scan_semaphore.signal
+ connection.server.scan_semaphore.signal
end
-
- session.process(result) if session
- result
end
end
end
end