// // Copyright 2018 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include #include #include #include "absl/status/status.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/resolver/xds/xds_resolver.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/transport/error_utils.h" #define GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS (15 * 60 * 1000) namespace grpc_core { TraceFlag grpc_xds_cluster_manager_lb_trace(false, "xds_cluster_manager_lb"); namespace { constexpr char kXdsClusterManager[] = "xds_cluster_manager_experimental"; // Config for xds_cluster_manager LB policy. class XdsClusterManagerLbConfig : public LoadBalancingPolicy::Config { public: using ClusterMap = std::map>; explicit XdsClusterManagerLbConfig(ClusterMap cluster_map) : cluster_map_(std::move(cluster_map)) {} const char* name() const override { return kXdsClusterManager; } const ClusterMap& cluster_map() const { return cluster_map_; } private: ClusterMap cluster_map_; }; // xds_cluster_manager LB policy. class XdsClusterManagerLb : public LoadBalancingPolicy { public: explicit XdsClusterManagerLb(Args args); const char* name() const override { return kXdsClusterManager; } void UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; private: // A simple wrapper for ref-counting a picker from the child policy. class ChildPickerWrapper : public RefCounted { public: ChildPickerWrapper(std::string name, std::unique_ptr picker) : name_(std::move(name)), picker_(std::move(picker)) {} PickResult Pick(PickArgs args) { return picker_->Pick(args); } const std::string& name() const { return name_; } private: std::string name_; std::unique_ptr picker_; }; // Picks a child using prefix or path matching and then delegates to that // child's picker. class ClusterPicker : public SubchannelPicker { public: // Maintains a map of cluster names to pickers. using ClusterMap = std::map>; // It is required that the keys of cluster_map have to live at least as long // as the ClusterPicker instance. explicit ClusterPicker(ClusterMap cluster_map) : cluster_map_(std::move(cluster_map)) {} PickResult Pick(PickArgs args) override; private: ClusterMap cluster_map_; }; // Each ClusterChild holds a ref to its parent XdsClusterManagerLb. class ClusterChild : public InternallyRefCounted { public: ClusterChild(RefCountedPtr xds_cluster_manager_policy, const std::string& name); ~ClusterChild() override; void Orphan() override; void UpdateLocked(RefCountedPtr config, const ServerAddressList& addresses, const grpc_channel_args* args); void ExitIdleLocked(); void ResetBackoffLocked(); void DeactivateLocked(); grpc_connectivity_state connectivity_state() const { return connectivity_state_; } RefCountedPtr picker_wrapper() const { return picker_wrapper_; } private: class Helper : public ChannelControlHelper { public: explicit Helper(RefCountedPtr xds_cluster_manager_child) : xds_cluster_manager_child_(std::move(xds_cluster_manager_child)) {} ~Helper() override { xds_cluster_manager_child_.reset(DEBUG_LOCATION, "Helper"); } RefCountedPtr CreateSubchannel( ServerAddress address, const grpc_channel_args& args) override; void UpdateState(grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) override; void RequestReresolution() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; private: RefCountedPtr xds_cluster_manager_child_; }; // Methods for dealing with the child policy. OrphanablePtr CreateChildPolicyLocked( const grpc_channel_args* args); static void OnDelayedRemovalTimer(void* arg, grpc_error* error); void OnDelayedRemovalTimerLocked(grpc_error* error); // The owning LB policy. RefCountedPtr xds_cluster_manager_policy_; // Points to the corresponding key in children map. const std::string name_; OrphanablePtr child_policy_; RefCountedPtr picker_wrapper_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_IDLE; bool seen_failure_since_ready_ = false; // States for delayed removal. grpc_timer delayed_removal_timer_; grpc_closure on_delayed_removal_timer_; bool delayed_removal_timer_callback_pending_ = false; bool shutdown_ = false; }; ~XdsClusterManagerLb() override; void ShutdownLocked() override; void UpdateStateLocked(); // Current config from the resolver. RefCountedPtr config_; // Internal state. bool shutting_down_ = false; // Children. std::map> children_; }; // // XdsClusterManagerLb::ClusterPicker // XdsClusterManagerLb::PickResult XdsClusterManagerLb::ClusterPicker::Pick( PickArgs args) { auto cluster_name = args.call_state->ExperimentalGetCallAttribute(kXdsClusterAttribute); auto it = cluster_map_.find(cluster_name); if (it != cluster_map_.end()) { return it->second->Pick(args); } PickResult result; result.type = PickResult::PICK_FAILED; result.error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("xds cluster manager picker: unknown cluster \"", cluster_name, "\"") .c_str()), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_INTERNAL); return result; } // // XdsClusterManagerLb // XdsClusterManagerLb::XdsClusterManagerLb(Args args) : LoadBalancingPolicy(std::move(args)) {} XdsClusterManagerLb::~XdsClusterManagerLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log( GPR_INFO, "[xds_cluster_manager_lb %p] destroying xds_cluster_manager LB policy", this); } } void XdsClusterManagerLb::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] shutting down", this); } shutting_down_ = true; children_.clear(); } void XdsClusterManagerLb::ExitIdleLocked() { for (auto& p : children_) p.second->ExitIdleLocked(); } void XdsClusterManagerLb::ResetBackoffLocked() { for (auto& p : children_) p.second->ResetBackoffLocked(); } void XdsClusterManagerLb::UpdateLocked(UpdateArgs args) { if (shutting_down_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] Received update", this); } // Update config. config_ = std::move(args.config); // Deactivate the children not in the new config. for (const auto& p : children_) { const std::string& name = p.first; ClusterChild* child = p.second.get(); if (config_->cluster_map().find(name) == config_->cluster_map().end()) { child->DeactivateLocked(); } } // Add or update the children in the new config. for (const auto& p : config_->cluster_map()) { const std::string& name = p.first; const RefCountedPtr& config = p.second; auto it = children_.find(name); if (it == children_.end()) { it = children_ .emplace(name, MakeOrphanable( Ref(DEBUG_LOCATION, "ClusterChild"), name)) .first; } it->second->UpdateLocked(config, args.addresses, args.args); } UpdateStateLocked(); } void XdsClusterManagerLb::UpdateStateLocked() { // Also count the number of children in each state, to determine the // overall state. size_t num_ready = 0; size_t num_connecting = 0; size_t num_idle = 0; size_t num_transient_failures = 0; for (const auto& p : children_) { const auto& child_name = p.first; const ClusterChild* child = p.second.get(); // Skip the children that are not in the latest update. if (config_->cluster_map().find(child_name) == config_->cluster_map().end()) { continue; } switch (child->connectivity_state()) { case GRPC_CHANNEL_READY: { ++num_ready; break; } case GRPC_CHANNEL_CONNECTING: { ++num_connecting; break; } case GRPC_CHANNEL_IDLE: { ++num_idle; break; } case GRPC_CHANNEL_TRANSIENT_FAILURE: { ++num_transient_failures; break; } default: GPR_UNREACHABLE_CODE(return ); } } // Determine aggregated connectivity state. grpc_connectivity_state connectivity_state; if (num_ready > 0) { connectivity_state = GRPC_CHANNEL_READY; } else if (num_connecting > 0) { connectivity_state = GRPC_CHANNEL_CONNECTING; } else if (num_idle > 0) { connectivity_state = GRPC_CHANNEL_IDLE; } else { connectivity_state = GRPC_CHANNEL_TRANSIENT_FAILURE; } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] connectivity changed to %s", this, ConnectivityStateName(connectivity_state)); } std::unique_ptr picker; absl::Status status; switch (connectivity_state) { case GRPC_CHANNEL_READY: { ClusterPicker::ClusterMap cluster_map; for (const auto& p : config_->cluster_map()) { const std::string& cluster_name = p.first; RefCountedPtr& child_picker = cluster_map[cluster_name]; child_picker = children_[cluster_name]->picker_wrapper(); if (child_picker == nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log( GPR_INFO, "[xds_cluster_manager_lb %p] child %s has not yet returned a " "picker; creating a QueuePicker.", this, cluster_name.c_str()); } child_picker = MakeRefCounted( cluster_name, absl::make_unique( Ref(DEBUG_LOCATION, "QueuePicker"))); } } picker = absl::make_unique(std::move(cluster_map)); break; } case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE: picker = absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker")); break; default: grpc_error* error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING( "TRANSIENT_FAILURE from XdsClusterManagerLb"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); status = grpc_error_to_absl_status(error); picker = absl::make_unique(error); } channel_control_helper()->UpdateState(connectivity_state, status, std::move(picker)); } // // XdsClusterManagerLb::ClusterChild // XdsClusterManagerLb::ClusterChild::ClusterChild( RefCountedPtr xds_cluster_manager_policy, const std::string& name) : xds_cluster_manager_policy_(std::move(xds_cluster_manager_policy)), name_(name) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] created ClusterChild %p for %s", xds_cluster_manager_policy_.get(), this, name_.c_str()); } GRPC_CLOSURE_INIT(&on_delayed_removal_timer_, OnDelayedRemovalTimer, this, grpc_schedule_on_exec_ctx); } XdsClusterManagerLb::ClusterChild::~ClusterChild() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] ClusterChild %p: destroying " "child", xds_cluster_manager_policy_.get(), this); } xds_cluster_manager_policy_.reset(DEBUG_LOCATION, "ClusterChild"); } void XdsClusterManagerLb::ClusterChild::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] ClusterChild %p %s: " "shutting down child", xds_cluster_manager_policy_.get(), this, name_.c_str()); } // Remove the child policy's interested_parties pollset_set from the // xDS policy. grpc_pollset_set_del_pollset_set( child_policy_->interested_parties(), xds_cluster_manager_policy_->interested_parties()); child_policy_.reset(); // Drop our ref to the child's picker, in case it's holding a ref to // the child. picker_wrapper_.reset(); if (delayed_removal_timer_callback_pending_) { grpc_timer_cancel(&delayed_removal_timer_); } shutdown_ = true; Unref(); } OrphanablePtr XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked( const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = xds_cluster_manager_policy_->work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = absl::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = MakeOrphanable(std::move(lb_policy_args), &grpc_xds_cluster_manager_lb_trace); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] ClusterChild %p %s: Created " "new child " "policy handler %p", xds_cluster_manager_policy_.get(), this, name_.c_str(), lb_policy.get()); } // Add the xDS's interested_parties pollset_set to that of the newly created // child policy. This will make the child policy progress upon activity on // xDS LB, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set( lb_policy->interested_parties(), xds_cluster_manager_policy_->interested_parties()); return lb_policy; } void XdsClusterManagerLb::ClusterChild::UpdateLocked( RefCountedPtr config, const ServerAddressList& addresses, const grpc_channel_args* args) { if (xds_cluster_manager_policy_->shutting_down_) return; // Update child weight. // Reactivate if needed. if (delayed_removal_timer_callback_pending_) { delayed_removal_timer_callback_pending_ = false; grpc_timer_cancel(&delayed_removal_timer_); } // Create child policy if needed. if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args); } // Construct update args. UpdateArgs update_args; update_args.config = std::move(config); update_args.addresses = addresses; update_args.args = grpc_channel_args_copy(args); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log(GPR_INFO, "[xds_cluster_manager_lb %p] ClusterChild %p %s: " "Updating child " "policy handler %p", xds_cluster_manager_policy_.get(), this, name_.c_str(), child_policy_.get()); } child_policy_->UpdateLocked(std::move(update_args)); } void XdsClusterManagerLb::ClusterChild::ExitIdleLocked() { child_policy_->ExitIdleLocked(); } void XdsClusterManagerLb::ClusterChild::ResetBackoffLocked() { child_policy_->ResetBackoffLocked(); } void XdsClusterManagerLb::ClusterChild::DeactivateLocked() { // If already deactivated, don't do that again. if (delayed_removal_timer_callback_pending_ == true) return; // Set the child weight to 0 so that future picker won't contain this child. // Start a timer to delete the child. Ref(DEBUG_LOCATION, "ClusterChild+timer").release(); grpc_timer_init(&delayed_removal_timer_, ExecCtx::Get()->Now() + GRPC_XDS_CLUSTER_MANAGER_CHILD_RETENTION_INTERVAL_MS, &on_delayed_removal_timer_); delayed_removal_timer_callback_pending_ = true; } void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimer( void* arg, grpc_error* error) { ClusterChild* self = static_cast(arg); GRPC_ERROR_REF(error); // Ref owned by the lambda self->xds_cluster_manager_policy_->work_serializer()->Run( [self, error]() { self->OnDelayedRemovalTimerLocked(error); }, DEBUG_LOCATION); } void XdsClusterManagerLb::ClusterChild::OnDelayedRemovalTimerLocked( grpc_error* error) { delayed_removal_timer_callback_pending_ = false; if (error == GRPC_ERROR_NONE && !shutdown_) { xds_cluster_manager_policy_->children_.erase(name_); } Unref(DEBUG_LOCATION, "ClusterChild+timer"); GRPC_ERROR_UNREF(error); } // // XdsClusterManagerLb::ClusterChild::Helper // RefCountedPtr XdsClusterManagerLb::ClusterChild::Helper::CreateSubchannel( ServerAddress address, const grpc_channel_args& args) { if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) { return nullptr; } return xds_cluster_manager_child_->xds_cluster_manager_policy_ ->channel_control_helper() ->CreateSubchannel(std::move(address), args); } void XdsClusterManagerLb::ClusterChild::Helper::UpdateState( grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_cluster_manager_lb_trace)) { gpr_log( GPR_INFO, "[xds_cluster_manager_lb %p] child %s: received update: state=%s (%s) " "picker=%p", xds_cluster_manager_child_->xds_cluster_manager_policy_.get(), xds_cluster_manager_child_->name_.c_str(), ConnectivityStateName(state), status.ToString().c_str(), picker.get()); } if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) { return; } // Cache the picker in the ClusterChild. xds_cluster_manager_child_->picker_wrapper_ = MakeRefCounted(xds_cluster_manager_child_->name_, std::move(picker)); // Decide what state to report for aggregation purposes. // If we haven't seen a failure since the last time we were in state // READY, then we report the state change as-is. However, once we do see // a failure, we report TRANSIENT_FAILURE and ignore any subsequent state // changes until we go back into state READY. if (!xds_cluster_manager_child_->seen_failure_since_ready_) { if (state == GRPC_CHANNEL_TRANSIENT_FAILURE) { xds_cluster_manager_child_->seen_failure_since_ready_ = true; } } else { if (state != GRPC_CHANNEL_READY) return; xds_cluster_manager_child_->seen_failure_since_ready_ = false; } xds_cluster_manager_child_->connectivity_state_ = state; // Notify the LB policy. xds_cluster_manager_child_->xds_cluster_manager_policy_->UpdateStateLocked(); } void XdsClusterManagerLb::ClusterChild::Helper::RequestReresolution() { if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) { return; } xds_cluster_manager_child_->xds_cluster_manager_policy_ ->channel_control_helper() ->RequestReresolution(); } void XdsClusterManagerLb::ClusterChild::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (xds_cluster_manager_child_->xds_cluster_manager_policy_->shutting_down_) { return; } xds_cluster_manager_child_->xds_cluster_manager_policy_ ->channel_control_helper() ->AddTraceEvent(severity, message); } // // factory // class XdsClusterManagerLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); } const char* name() const override { return kXdsClusterManager; } RefCountedPtr ParseLoadBalancingConfig( const Json& json, grpc_error** error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { // xds_cluster_manager was mentioned as a policy in the deprecated // loadBalancingPolicy field or in the client API. *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:loadBalancingPolicy error:xds_cluster_manager policy requires " "configuration. Please use loadBalancingConfig field of service " "config instead."); return nullptr; } std::vector error_list; XdsClusterManagerLbConfig::ClusterMap cluster_map; std::set clusters_to_be_used; auto it = json.object_value().find("children"); if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:children error:required field not present")); } else if (it->second.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:children error:type should be object")); } else { for (const auto& p : it->second.object_value()) { const std::string& child_name = p.first; if (child_name.empty()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:children element error: name cannot be empty")); continue; } RefCountedPtr child_config; std::vector child_errors = ParseChildConfig(p.second, &child_config); if (!child_errors.empty()) { // Can't use GRPC_ERROR_CREATE_FROM_VECTOR() here, because the error // string is not static in this case. grpc_error* error = GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:children name:", child_name).c_str()); for (grpc_error* child_error : child_errors) { error = grpc_error_add_child(error, child_error); } error_list.push_back(error); } else { cluster_map[child_name] = std::move(child_config); clusters_to_be_used.insert(child_name); } } } if (cluster_map.empty()) { error_list.push_back( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no valid children configured")); } if (!error_list.empty()) { *error = GRPC_ERROR_CREATE_FROM_VECTOR( "xds_cluster_manager_experimental LB policy config", &error_list); return nullptr; } return MakeRefCounted(std::move(cluster_map)); } private: static std::vector ParseChildConfig( const Json& json, RefCountedPtr* child_config) { std::vector error_list; if (json.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "value should be of type object")); return error_list; } auto it = json.object_value().find("childPolicy"); if (it == json.object_value().end()) { error_list.push_back( GRPC_ERROR_CREATE_FROM_STATIC_STRING("did not find childPolicy")); } else { grpc_error* parse_error = GRPC_ERROR_NONE; *child_config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( it->second, &parse_error); if (*child_config == nullptr) { GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); std::vector child_errors; child_errors.push_back(parse_error); error_list.push_back( GRPC_ERROR_CREATE_FROM_VECTOR("field:childPolicy", &child_errors)); } } return error_list; } }; } // namespace } // namespace grpc_core // // Plugin registration // void grpc_lb_policy_xds_cluster_manager_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( absl::make_unique()); } void grpc_lb_policy_xds_cluster_manager_shutdown() {}