// // Copyright 2019 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include #include "absl/strings/str_cat.h" #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/xds/xds_certificate_provider.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_cluster.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/memory.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/security/credentials/xds/xds_credentials.h" #include "src/core/lib/transport/error_utils.h" namespace grpc_core { TraceFlag grpc_cds_lb_trace(false, "cds_lb"); namespace { constexpr char kCds[] = "cds_experimental"; // Config for this LB policy. class CdsLbConfig : public LoadBalancingPolicy::Config { public: explicit CdsLbConfig(std::string cluster) : cluster_(std::move(cluster)) {} const std::string& cluster() const { return cluster_; } const char* name() const override { return kCds; } private: std::string cluster_; }; // CDS LB policy. class CdsLb : public LoadBalancingPolicy { public: CdsLb(RefCountedPtr xds_client, Args args); const char* name() const override { return kCds; } void UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; void ExitIdleLocked() override; private: // Watcher for getting cluster data from XdsClient. class ClusterWatcher : public XdsClusterResourceType::WatcherInterface { public: ClusterWatcher(RefCountedPtr parent, std::string name) : parent_(std::move(parent)), name_(std::move(name)) {} void OnResourceChanged(XdsClusterResource cluster_data) override { Ref().release(); // Ref held by lambda parent_->work_serializer()->Run( // TODO(roth): When we move to C++14, capture cluster_data with // std::move(). [this, cluster_data]() mutable { parent_->OnClusterChanged(name_, std::move(cluster_data)); Unref(); }, DEBUG_LOCATION); } void OnError(grpc_error_handle error) override { Ref().release(); // Ref held by lambda parent_->work_serializer()->Run( [this, error]() { parent_->OnError(name_, error); Unref(); }, DEBUG_LOCATION); } void OnResourceDoesNotExist() override { Ref().release(); // Ref held by lambda parent_->work_serializer()->Run( [this]() { parent_->OnResourceDoesNotExist(name_); Unref(); }, DEBUG_LOCATION); } private: RefCountedPtr parent_; std::string name_; }; struct WatcherState { // Pointer to watcher, to be used when cancelling. // Not owned, so do not dereference. ClusterWatcher* watcher = nullptr; // Most recent update obtained from this watcher. absl::optional update; }; // Delegating helper to be passed to child policy. class Helper : public ChannelControlHelper { public: explicit Helper(RefCountedPtr parent) : parent_(std::move(parent)) {} RefCountedPtr CreateSubchannel( ServerAddress address, const grpc_channel_args& args) override; void UpdateState(grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; private: RefCountedPtr parent_; }; ~CdsLb() override; void ShutdownLocked() override; bool GenerateDiscoveryMechanismForCluster( const std::string& name, Json::Array* discovery_mechanisms, std::set* clusters_needed); void OnClusterChanged(const std::string& name, XdsClusterResource cluster_data); void OnError(const std::string& name, grpc_error_handle error); void OnResourceDoesNotExist(const std::string& name); grpc_error_handle UpdateXdsCertificateProvider( const std::string& cluster_name, const XdsClusterResource& cluster_data); void CancelClusterDataWatch(absl::string_view cluster_name, ClusterWatcher* watcher, bool delay_unsubscription = false); void MaybeDestroyChildPolicyLocked(); RefCountedPtr config_; // Current channel args from the resolver. const grpc_channel_args* args_ = nullptr; // The xds client. RefCountedPtr xds_client_; // Maps from cluster name to the state for that cluster. // The root of the tree is config_->cluster(). std::map watchers_; RefCountedPtr root_certificate_provider_; RefCountedPtr identity_certificate_provider_; RefCountedPtr xds_certificate_provider_; // Child LB policy. OrphanablePtr child_policy_; // Internal state. bool shutting_down_ = false; }; // // CdsLb::Helper // RefCountedPtr CdsLb::Helper::CreateSubchannel( ServerAddress address, const grpc_channel_args& args) { if (parent_->shutting_down_) return nullptr; return parent_->channel_control_helper()->CreateSubchannel(std::move(address), args); } void CdsLb::Helper::UpdateState(grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) { if (parent_->shutting_down_ || parent_->child_policy_ == nullptr) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] state updated by child: %s message_state: (%s)", this, ConnectivityStateName(state), status.ToString().c_str()); } parent_->channel_control_helper()->UpdateState(state, status, std::move(picker)); } void CdsLb::Helper::RequestReresolution() { if (parent_->shutting_down_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] Re-resolution requested from child policy.", parent_.get()); } parent_->channel_control_helper()->RequestReresolution(); } absl::string_view CdsLb::Helper::GetAuthority() { return parent_->channel_control_helper()->GetAuthority(); } void CdsLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (parent_->shutting_down_) return; parent_->channel_control_helper()->AddTraceEvent(severity, message); } // // CdsLb // CdsLb::CdsLb(RefCountedPtr xds_client, Args args) : LoadBalancingPolicy(std::move(args)), xds_client_(std::move(xds_client)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] created -- using xds client %p", this, xds_client_.get()); } } CdsLb::~CdsLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] destroying cds LB policy", this); } } void CdsLb::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] shutting down", this); } shutting_down_ = true; MaybeDestroyChildPolicyLocked(); if (xds_client_ != nullptr) { for (auto& watcher : watchers_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, watcher.first.c_str()); } CancelClusterDataWatch(watcher.first, watcher.second.watcher, /*delay_unsubscription=*/false); } watchers_.clear(); xds_client_.reset(DEBUG_LOCATION, "CdsLb"); } grpc_channel_args_destroy(args_); args_ = nullptr; } void CdsLb::MaybeDestroyChildPolicyLocked() { if (child_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), interested_parties()); child_policy_.reset(); } } void CdsLb::ResetBackoffLocked() { if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } void CdsLb::ExitIdleLocked() { if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); } void CdsLb::UpdateLocked(UpdateArgs args) { // Update config. auto old_config = std::move(config_); config_ = std::move(args.config); if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] received update: cluster=%s", this, config_->cluster().c_str()); } // Update args. grpc_channel_args_destroy(args_); args_ = args.args; args.args = nullptr; // If cluster name changed, cancel watcher and restart. if (old_config == nullptr || old_config->cluster() != config_->cluster()) { if (old_config != nullptr) { for (auto& watcher : watchers_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, watcher.first.c_str()); } CancelClusterDataWatch(watcher.first, watcher.second.watcher, /*delay_unsubscription=*/true); } watchers_.clear(); } auto watcher = MakeRefCounted(Ref(), config_->cluster()); watchers_[config_->cluster()].watcher = watcher.get(); XdsClusterResourceType::StartWatch(xds_client_.get(), config_->cluster(), std::move(watcher)); } } // This method will attempt to generate one or multiple entries of discovery // mechanism recursively: // For cluster types EDS or LOGICAL_DNS, one discovery mechanism entry may be // generated cluster name, type and other data from the CdsUpdate inserted into // the entry and the entry appended to the array of entries. // Note, discovery mechanism entry can be generated if an CdsUpdate is // available; otherwise, just return false. For cluster type AGGREGATE, // recursively call the method for each child cluster. bool CdsLb::GenerateDiscoveryMechanismForCluster( const std::string& name, Json::Array* discovery_mechanisms, std::set* clusters_needed) { clusters_needed->insert(name); auto& state = watchers_[name]; // Create a new watcher if needed. if (state.watcher == nullptr) { auto watcher = MakeRefCounted(Ref(), name); if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] starting watch for cluster %s", this, name.c_str()); } state.watcher = watcher.get(); XdsClusterResourceType::StartWatch(xds_client_.get(), name, std::move(watcher)); return false; } // Don't have the update we need yet. if (!state.update.has_value()) return false; // For AGGREGATE clusters, recursively expand to child clusters. if (state.update->cluster_type == XdsClusterResource::ClusterType::AGGREGATE) { bool missing_cluster = false; for (const std::string& child_name : state.update->prioritized_cluster_names) { if (!GenerateDiscoveryMechanismForCluster( child_name, discovery_mechanisms, clusters_needed)) { missing_cluster = true; } } return !missing_cluster; } Json::Object mechanism = { {"clusterName", name}, {"max_concurrent_requests", state.update->max_concurrent_requests}, }; switch (state.update->cluster_type) { case XdsClusterResource::ClusterType::EDS: mechanism["type"] = "EDS"; if (!state.update->eds_service_name.empty()) { mechanism["edsServiceName"] = state.update->eds_service_name; } break; case XdsClusterResource::ClusterType::LOGICAL_DNS: mechanism["type"] = "LOGICAL_DNS"; mechanism["dnsHostname"] = state.update->dns_hostname; break; default: GPR_ASSERT(0); break; } if (state.update->lrs_load_reporting_server_name.has_value()) { mechanism["lrsLoadReportingServerName"] = state.update->lrs_load_reporting_server_name.value(); } discovery_mechanisms->emplace_back(std::move(mechanism)); return true; } void CdsLb::OnClusterChanged(const std::string& name, XdsClusterResource cluster_data) { if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log( GPR_INFO, "[cdslb %p] received CDS update for cluster %s from xds client %p: %s", this, name.c_str(), xds_client_.get(), cluster_data.ToString().c_str()); } // Store the update in the map if we are still interested in watching this // cluster (i.e., it is not cancelled already). // If we've already deleted this entry, then this is an update notification // that was scheduled before the deletion, so we can just ignore it. auto it = watchers_.find(name); if (it == watchers_.end()) return; it->second.update = cluster_data; // Take care of integration with new certificate code. grpc_error_handle error = GRPC_ERROR_NONE; error = UpdateXdsCertificateProvider(name, it->second.update.value()); if (error != GRPC_ERROR_NONE) { return OnError(name, error); } // Scan the map starting from the root cluster to generate the list of // discovery mechanisms. If we don't have some of the data we need (i.e., we // just started up and not all watchers have returned data yet), then don't // update the child policy at all. Json::Array discovery_mechanisms; std::set clusters_needed; if (GenerateDiscoveryMechanismForCluster( config_->cluster(), &discovery_mechanisms, &clusters_needed)) { // Construct config for child policy. Json::Object xds_lb_policy; if (cluster_data.lb_policy == "RING_HASH") { xds_lb_policy["RING_HASH"] = Json::Object{ {"min_ring_size", cluster_data.min_ring_size}, {"max_ring_size", cluster_data.max_ring_size}, }; } else { xds_lb_policy["ROUND_ROBIN"] = Json::Object(); } Json::Object child_config = { {"xdsLbPolicy", Json::Array{ xds_lb_policy, }}, {"discoveryMechanisms", std::move(discovery_mechanisms)}, }; Json json = Json::Array{ Json::Object{ {"xds_cluster_resolver_experimental", std::move(child_config)}, }, }; if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { std::string json_str = json.Dump(/*indent=*/1); gpr_log(GPR_INFO, "[cdslb %p] generated config for child policy: %s", this, json_str.c_str()); } RefCountedPtr config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(json, &error); if (error != GRPC_ERROR_NONE) { OnError(name, error); return; } // Create child policy if not already present. if (child_policy_ == nullptr) { LoadBalancingPolicy::Args args; args.work_serializer = work_serializer(); args.args = args_; args.channel_control_helper = absl::make_unique(Ref()); child_policy_ = LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy( config->name(), std::move(args)); if (child_policy_ == nullptr) { OnError(name, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "failed to create child policy")); return; } grpc_pollset_set_add_pollset_set(child_policy_->interested_parties(), interested_parties()); if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] created child policy %s (%p)", this, config->name(), child_policy_.get()); } } // Update child policy. UpdateArgs args; args.config = std::move(config); if (xds_certificate_provider_ != nullptr) { grpc_arg arg_to_add = xds_certificate_provider_->MakeChannelArg(); args.args = grpc_channel_args_copy_and_add(args_, &arg_to_add, 1); } else { args.args = grpc_channel_args_copy(args_); } child_policy_->UpdateLocked(std::move(args)); } // Remove entries in watchers_ for any clusters not in clusters_needed for (auto it = watchers_.begin(); it != watchers_.end();) { const std::string& cluster_name = it->first; if (clusters_needed.find(cluster_name) != clusters_needed.end()) { ++it; continue; } if (GRPC_TRACE_FLAG_ENABLED(grpc_cds_lb_trace)) { gpr_log(GPR_INFO, "[cdslb %p] cancelling watch for cluster %s", this, cluster_name.c_str()); } CancelClusterDataWatch(cluster_name, it->second.watcher, /*delay_unsubscription=*/false); it = watchers_.erase(it); } } void CdsLb::OnError(const std::string& name, grpc_error_handle error) { gpr_log(GPR_ERROR, "[cdslb %p] xds error obtaining data for cluster %s: %s", this, name.c_str(), grpc_error_std_string(error).c_str()); // Go into TRANSIENT_FAILURE if we have not yet created the child // policy (i.e., we have not yet received data from xds). Otherwise, // we keep running with the data we had previously. if (child_policy_ == nullptr) { absl::Status status = grpc_error_to_absl_status(error); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); } GRPC_ERROR_UNREF(error); } void CdsLb::OnResourceDoesNotExist(const std::string& name) { gpr_log(GPR_ERROR, "[cdslb %p] CDS resource for %s does not exist -- reporting " "TRANSIENT_FAILURE", this, name.c_str()); absl::Status status = absl::UnavailableError( absl::StrCat("CDS resource \"", config_->cluster(), "\" does not exist")); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, absl::make_unique(status)); MaybeDestroyChildPolicyLocked(); } grpc_error_handle CdsLb::UpdateXdsCertificateProvider( const std::string& cluster_name, const XdsClusterResource& cluster_data) { // Early out if channel is not configured to use xds security. grpc_channel_credentials* channel_credentials = grpc_channel_credentials_find_in_args(args_); if (channel_credentials == nullptr || channel_credentials->type() != kCredentialsTypeXds) { xds_certificate_provider_ = nullptr; return GRPC_ERROR_NONE; } if (xds_certificate_provider_ == nullptr) { xds_certificate_provider_ = MakeRefCounted(); } // Configure root cert. absl::string_view root_provider_instance_name = cluster_data.common_tls_context.certificate_validation_context .ca_certificate_provider_instance.instance_name; absl::string_view root_provider_cert_name = cluster_data.common_tls_context.certificate_validation_context .ca_certificate_provider_instance.certificate_name; RefCountedPtr new_root_provider; if (!root_provider_instance_name.empty()) { new_root_provider = xds_client_->certificate_provider_store() .CreateOrGetCertificateProvider(root_provider_instance_name); if (new_root_provider == nullptr) { return grpc_error_set_int( GRPC_ERROR_CREATE_FROM_CPP_STRING( absl::StrCat("Certificate provider instance name: \"", root_provider_instance_name, "\" not recognized.")), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); } } if (root_certificate_provider_ != new_root_provider) { if (root_certificate_provider_ != nullptr && root_certificate_provider_->interested_parties() != nullptr) { grpc_pollset_set_del_pollset_set( interested_parties(), root_certificate_provider_->interested_parties()); } if (new_root_provider != nullptr && new_root_provider->interested_parties() != nullptr) { grpc_pollset_set_add_pollset_set(interested_parties(), new_root_provider->interested_parties()); } root_certificate_provider_ = std::move(new_root_provider); } xds_certificate_provider_->UpdateRootCertNameAndDistributor( cluster_name, root_provider_cert_name, root_certificate_provider_ == nullptr ? nullptr : root_certificate_provider_->distributor()); // Configure identity cert. absl::string_view identity_provider_instance_name = cluster_data.common_tls_context.tls_certificate_provider_instance .instance_name; absl::string_view identity_provider_cert_name = cluster_data.common_tls_context.tls_certificate_provider_instance .certificate_name; RefCountedPtr new_identity_provider; if (!identity_provider_instance_name.empty()) { new_identity_provider = xds_client_->certificate_provider_store() .CreateOrGetCertificateProvider(identity_provider_instance_name); if (new_identity_provider == nullptr) { return grpc_error_set_int( GRPC_ERROR_CREATE_FROM_CPP_STRING(absl::StrCat( "Certificate provider instance name: \"", identity_provider_instance_name, "\" not recognized.")), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); } } if (identity_certificate_provider_ != new_identity_provider) { if (identity_certificate_provider_ != nullptr && identity_certificate_provider_->interested_parties() != nullptr) { grpc_pollset_set_del_pollset_set( interested_parties(), identity_certificate_provider_->interested_parties()); } if (new_identity_provider != nullptr && new_identity_provider->interested_parties() != nullptr) { grpc_pollset_set_add_pollset_set( interested_parties(), new_identity_provider->interested_parties()); } identity_certificate_provider_ = std::move(new_identity_provider); } xds_certificate_provider_->UpdateIdentityCertNameAndDistributor( cluster_name, identity_provider_cert_name, identity_certificate_provider_ == nullptr ? nullptr : identity_certificate_provider_->distributor()); // Configure SAN matchers. const std::vector& match_subject_alt_names = cluster_data.common_tls_context.certificate_validation_context .match_subject_alt_names; xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers( cluster_name, match_subject_alt_names); return GRPC_ERROR_NONE; } void CdsLb::CancelClusterDataWatch(absl::string_view cluster_name, ClusterWatcher* watcher, bool delay_unsubscription) { if (xds_certificate_provider_ != nullptr) { std::string name(cluster_name); xds_certificate_provider_->UpdateRootCertNameAndDistributor(name, "", nullptr); xds_certificate_provider_->UpdateIdentityCertNameAndDistributor(name, "", nullptr); xds_certificate_provider_->UpdateSubjectAlternativeNameMatchers(name, {}); } XdsClusterResourceType::CancelWatch(xds_client_.get(), cluster_name, watcher, delay_unsubscription); } // // factory // class CdsLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { RefCountedPtr xds_client = XdsClient::GetFromChannelArgs(*args.args); if (xds_client == nullptr) { gpr_log(GPR_ERROR, "XdsClient not present in channel args -- cannot instantiate " "cds LB policy"); return nullptr; } return MakeOrphanable(std::move(xds_client), std::move(args)); } const char* name() const override { return kCds; } RefCountedPtr ParseLoadBalancingConfig( const Json& json, grpc_error_handle* error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { // xds was mentioned as a policy in the deprecated loadBalancingPolicy // field or in the client API. *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:loadBalancingPolicy error:cds policy requires configuration. " "Please use loadBalancingConfig field of service config instead."); return nullptr; } std::vector error_list; // cluster name. std::string cluster; auto it = json.object_value().find("cluster"); if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "required field 'cluster' not present")); } else if (it->second.type() != Json::Type::STRING) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:cluster error:type should be string")); } else { cluster = it->second.string_value(); } if (!error_list.empty()) { *error = GRPC_ERROR_CREATE_FROM_VECTOR("Cds Parser", &error_list); return nullptr; } return MakeRefCounted(std::move(cluster)); } }; } // namespace } // namespace grpc_core // // Plugin registration // void grpc_lb_policy_cds_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( absl::make_unique()); } void grpc_lb_policy_cds_shutdown() {}