// // // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include #include "src/core/ext/transport/chttp2/client/chttp2_connector.h" #include #include #include #include #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_format.h" #include #include #include #include #include #include #include #include #include "src/core/client_channel/client_channel_factory.h" #include "src/core/client_channel/client_channel_filter.h" #include "src/core/client_channel/connector.h" #include "src/core/client_channel/subchannel.h" #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/channel/channel_args_preconditioning.h" #include "src/core/lib/channel/channelz.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/event_engine/channel_args_endpoint_config.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/iomgr/endpoint.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/security/credentials/credentials.h" #include "src/core/lib/security/credentials/insecure/insecure_credentials.h" #include "src/core/lib/security/security_connector/security_connector.h" #include "src/core/lib/surface/api_trace.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/surface/channel_stack_type.h" #include "src/core/lib/transport/error_utils.h" #include "src/core/lib/transport/handshaker.h" #include "src/core/lib/transport/handshaker_registry.h" #include "src/core/lib/transport/tcp_connect_handshaker.h" #include "src/core/lib/transport/transport.h" #include "src/core/resolver/resolver_registry.h" #ifdef GPR_SUPPORT_CHANNELS_FROM_FD #include #include "src/core/lib/iomgr/ev_posix.h" #include "src/core/lib/iomgr/tcp_client_posix.h" #endif // GPR_SUPPORT_CHANNELS_FROM_FD namespace grpc_core { using ::grpc_event_engine::experimental::EventEngine; namespace { void NullThenSchedClosure(const DebugLocation& location, grpc_closure** closure, grpc_error_handle error) { grpc_closure* c = *closure; *closure = nullptr; ExecCtx::Run(location, c, error); } } // namespace Chttp2Connector::~Chttp2Connector() { if (endpoint_ != nullptr) { grpc_endpoint_destroy(endpoint_); } } void Chttp2Connector::Connect(const Args& args, Result* result, grpc_closure* notify) { { MutexLock lock(&mu_); GPR_ASSERT(notify_ == nullptr); args_ = args; result_ = result; notify_ = notify; GPR_ASSERT(endpoint_ == nullptr); event_engine_ = args_.channel_args.GetObject(); } absl::StatusOr address = grpc_sockaddr_to_uri(args.address); if (!address.ok()) { grpc_error_handle error = GRPC_ERROR_CREATE(address.status().ToString()); NullThenSchedClosure(DEBUG_LOCATION, ¬ify_, error); return; } ChannelArgs channel_args = args_.channel_args .Set(GRPC_ARG_TCP_HANDSHAKER_RESOLVED_ADDRESS, address.value()) .Set(GRPC_ARG_TCP_HANDSHAKER_BIND_ENDPOINT_TO_POLLSET, 1); handshake_mgr_ = MakeRefCounted(); CoreConfiguration::Get().handshaker_registry().AddHandshakers( HANDSHAKER_CLIENT, channel_args, args_.interested_parties, handshake_mgr_.get()); Ref().release(); // Ref held by OnHandshakeDone(). handshake_mgr_->DoHandshake(nullptr /* endpoint */, channel_args, args.deadline, nullptr /* acceptor */, OnHandshakeDone, this); } void Chttp2Connector::Shutdown(grpc_error_handle error) { MutexLock lock(&mu_); shutdown_ = true; if (handshake_mgr_ != nullptr) { // Handshaker will also shutdown the endpoint if it exists handshake_mgr_->Shutdown(error); } } void Chttp2Connector::OnHandshakeDone(void* arg, grpc_error_handle error) { auto* args = static_cast(arg); Chttp2Connector* self = static_cast(args->user_data); { MutexLock lock(&self->mu_); if (!error.ok() || self->shutdown_) { if (error.ok()) { error = GRPC_ERROR_CREATE("connector shutdown"); // We were shut down after handshaking completed successfully, so // destroy the endpoint here. if (args->endpoint != nullptr) { // TODO(ctiller): It is currently necessary to shutdown endpoints // before destroying them, even if we know that there are no // pending read/write callbacks. This should be fixed, at which // point this can be removed. grpc_endpoint_shutdown(args->endpoint, error); grpc_endpoint_destroy(args->endpoint); grpc_slice_buffer_destroy(args->read_buffer); gpr_free(args->read_buffer); } } self->result_->Reset(); NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error); } else if (args->endpoint != nullptr) { self->result_->transport = grpc_create_chttp2_transport(args->args, args->endpoint, true); GPR_ASSERT(self->result_->transport != nullptr); self->result_->socket_node = grpc_chttp2_transport_get_socket_node(self->result_->transport); self->result_->channel_args = args->args; self->endpoint_ = args->endpoint; self->Ref().release(); // Ref held by OnReceiveSettings() GRPC_CLOSURE_INIT(&self->on_receive_settings_, OnReceiveSettings, self, grpc_schedule_on_exec_ctx); grpc_chttp2_transport_start_reading(self->result_->transport, args->read_buffer, &self->on_receive_settings_, nullptr); self->timer_handle_ = self->event_engine_->RunAfter( self->args_.deadline - Timestamp::Now(), [self = self->RefAsSubclass()] { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; self->OnTimeout(); }); } else { // If the handshaking succeeded but there is no endpoint, then the // handshaker may have handed off the connection to some external // code. Just verify that exit_early flag is set. GPR_DEBUG_ASSERT(args->exit_early); NullThenSchedClosure(DEBUG_LOCATION, &self->notify_, error); } self->handshake_mgr_.reset(); } self->Unref(); } void Chttp2Connector::OnReceiveSettings(void* arg, grpc_error_handle error) { Chttp2Connector* self = static_cast(arg); { MutexLock lock(&self->mu_); if (!self->notify_error_.has_value()) { grpc_endpoint_delete_from_pollset_set(self->endpoint_, self->args_.interested_parties); if (!error.ok()) { // Transport got an error while waiting on SETTINGS frame. self->result_->Reset(); } self->MaybeNotify(error); if (self->timer_handle_.has_value()) { if (self->event_engine_->Cancel(*self->timer_handle_)) { // If we have cancelled the timer successfully, call Notify() again // since the timer callback will not be called now. self->MaybeNotify(absl::OkStatus()); } self->timer_handle_.reset(); } } else { // OnTimeout() was already invoked. Call Notify() again so that notify_ // can be invoked. self->MaybeNotify(absl::OkStatus()); } } self->Unref(); } void Chttp2Connector::OnTimeout() { MutexLock lock(&mu_); timer_handle_.reset(); if (!notify_error_.has_value()) { // The transport did not receive the settings frame in time. Destroy the // transport. grpc_endpoint_delete_from_pollset_set(endpoint_, args_.interested_parties); result_->Reset(); MaybeNotify(GRPC_ERROR_CREATE( "connection attempt timed out before receiving SETTINGS frame")); } else { // OnReceiveSettings() was already invoked. Call Notify() again so that // notify_ can be invoked. MaybeNotify(absl::OkStatus()); } } void Chttp2Connector::MaybeNotify(grpc_error_handle error) { if (notify_error_.has_value()) { NullThenSchedClosure(DEBUG_LOCATION, ¬ify_, notify_error_.value()); // Clear state for a new Connect(). // Clear out the endpoint_, since it is the responsibility of // the transport to shut it down. endpoint_ = nullptr; notify_error_.reset(); } else { notify_error_ = error; } } namespace { class Chttp2SecureClientChannelFactory : public ClientChannelFactory { public: RefCountedPtr CreateSubchannel( const grpc_resolved_address& address, const ChannelArgs& args) override { absl::StatusOr new_args = GetSecureNamingChannelArgs(args); if (!new_args.ok()) { gpr_log(GPR_ERROR, "Failed to create channel args during subchannel creation: %s; " "Got args: %s", new_args.status().ToString().c_str(), args.ToString().c_str()); return nullptr; } RefCountedPtr s = Subchannel::Create( MakeOrphanable(), address, *new_args); return s; } private: static absl::StatusOr GetSecureNamingChannelArgs( ChannelArgs args) { auto* channel_credentials = args.GetObject(); if (channel_credentials == nullptr) { return absl::InternalError( "channel credentials missing for secure channel"); } // Make sure security connector does not already exist in args. if (args.Contains(GRPC_ARG_SECURITY_CONNECTOR)) { return absl::InternalError( "security connector already present in channel args."); } // Find the authority to use in the security connector. absl::optional authority = args.GetOwnedString(GRPC_ARG_DEFAULT_AUTHORITY); if (!authority.has_value()) { return absl::InternalError("authority not present in channel args"); } // Create the security connector using the credentials and target name. RefCountedPtr subchannel_security_connector = channel_credentials->create_security_connector( /*call_creds=*/nullptr, authority->c_str(), &args); if (subchannel_security_connector == nullptr) { return absl::InternalError(absl::StrFormat( "Failed to create secure subchannel for secure name '%s'", *authority)); } return args.SetObject(std::move(subchannel_security_connector)); } }; absl::StatusOr> CreateChannel(const char* target, const ChannelArgs& args) { if (target == nullptr) { gpr_log(GPR_ERROR, "cannot create channel with NULL target name"); return absl::InvalidArgumentError("channel target is NULL"); } // Add channel arg containing the server URI. std::string canonical_target = CoreConfiguration::Get().resolver_registry().AddDefaultPrefixIfNeeded( target); return Channel::Create(target, args.Set(GRPC_ARG_SERVER_URI, canonical_target), GRPC_CLIENT_CHANNEL, nullptr); } } // namespace } // namespace grpc_core namespace { grpc_core::Chttp2SecureClientChannelFactory* g_factory; gpr_once g_factory_once = GPR_ONCE_INIT; void FactoryInit() { g_factory = new grpc_core::Chttp2SecureClientChannelFactory(); } } // namespace // Create a secure client channel: // Asynchronously: - resolve target // - connect to it (trying alternatives as presented) // - perform handshakes grpc_channel* grpc_channel_create(const char* target, grpc_channel_credentials* creds, const grpc_channel_args* c_args) { grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE("grpc_secure_channel_create(target=%s, creds=%p, args=%p)", 3, (target, (void*)creds, (void*)c_args)); grpc_channel* channel = nullptr; grpc_error_handle error; if (creds != nullptr) { // Add channel args containing the client channel factory and channel // credentials. gpr_once_init(&g_factory_once, FactoryInit); grpc_core::ChannelArgs args = creds->update_arguments(grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() .PreconditionChannelArgs(c_args) .SetObject(creds->Ref()) .SetObject(g_factory)); // Create channel. auto r = grpc_core::CreateChannel(target, args); if (r.ok()) { channel = r->release()->c_ptr(); } else { error = absl_status_to_grpc_error(r.status()); } } if (channel == nullptr) { intptr_t integer; grpc_status_code status = GRPC_STATUS_INTERNAL; if (grpc_error_get_int(error, grpc_core::StatusIntProperty::kRpcStatus, &integer)) { status = static_cast(integer); } channel = grpc_lame_client_channel_create( target, status, "Failed to create secure client channel"); } return channel; } #ifdef GPR_SUPPORT_CHANNELS_FROM_FD grpc_channel* grpc_channel_create_from_fd(const char* target, int fd, grpc_channel_credentials* creds, const grpc_channel_args* args) { grpc_core::ExecCtx exec_ctx; GRPC_API_TRACE( "grpc_channel_create_from_fd(target=%p, fd=%d, creds=%p, args=%p)", 4, (target, fd, creds, args)); // For now, we only support insecure channel credentials. if (creds == nullptr || creds->type() != grpc_core::InsecureCredentials::Type()) { return grpc_lame_client_channel_create( target, GRPC_STATUS_INTERNAL, "Failed to create client channel due to invalid creds"); } grpc_core::ChannelArgs final_args = grpc_core::CoreConfiguration::Get() .channel_args_preconditioning() .PreconditionChannelArgs(args) .SetIfUnset(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority") .SetObject(creds->Ref()); int flags = fcntl(fd, F_GETFL, 0); GPR_ASSERT(fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0); grpc_endpoint* client = grpc_tcp_create_from_fd( grpc_fd_create(fd, "client", true), grpc_event_engine::experimental::ChannelArgsEndpointConfig(final_args), "fd-client"); grpc_core::Transport* transport = grpc_create_chttp2_transport(final_args, client, true); GPR_ASSERT(transport); auto channel = grpc_core::Channel::Create( target, final_args, GRPC_CLIENT_DIRECT_CHANNEL, transport); if (channel.ok()) { grpc_chttp2_transport_start_reading(transport, nullptr, nullptr, nullptr); grpc_core::ExecCtx::Get()->Flush(); return channel->release()->c_ptr(); } else { transport->Orphan(); return grpc_lame_client_channel_create( target, static_cast(channel.status().code()), "Failed to create client channel"); } } #else // !GPR_SUPPORT_CHANNELS_FROM_FD grpc_channel* grpc_channel_create_from_fd(const char* /* target */, int /* fd */, grpc_channel_credentials* /* creds*/, const grpc_channel_args* /* args */) { GPR_ASSERT(0); return nullptr; } #endif // GPR_SUPPORT_CHANNELS_FROM_FD