// // Copyright 2018 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "src/core/load_balancing/child_policy_handler.h" #include #include #include "absl/log/check.h" #include "absl/status/status.h" #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include #include #include #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/load_balancing/delegating_helper.h" #include "src/core/load_balancing/lb_policy_registry.h" #include "src/core/load_balancing/subchannel_interface.h" namespace grpc_core { // // ChildPolicyHandler::Helper // class ChildPolicyHandler::Helper final : public LoadBalancingPolicy::ParentOwningDelegatingChannelControlHelper< ChildPolicyHandler> { public: explicit Helper(RefCountedPtr parent) : ParentOwningDelegatingChannelControlHelper(std::move(parent)) {} RefCountedPtr CreateSubchannel( const grpc_resolved_address& address, const ChannelArgs& per_address_args, const ChannelArgs& args) override { if (parent()->shutting_down_) return nullptr; if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr; return parent()->channel_control_helper()->CreateSubchannel( address, per_address_args, args); } void UpdateState(grpc_connectivity_state state, const absl::Status& status, RefCountedPtr picker) override { if (parent()->shutting_down_) return; // If this request is from the pending child policy, ignore it until // it reports something other than CONNECTING, at which point we swap it // into place. if (CalledByPendingChild()) { if (GRPC_TRACE_FLAG_ENABLED(*(parent()->tracer_))) { gpr_log(GPR_INFO, "[child_policy_handler %p] helper %p: pending child policy %p " "reports state=%s (%s)", parent(), this, child_, ConnectivityStateName(state), status.ToString().c_str()); } if (state == GRPC_CHANNEL_CONNECTING) return; grpc_pollset_set_del_pollset_set( parent()->child_policy_->interested_parties(), parent()->interested_parties()); parent()->child_policy_ = std::move(parent()->pending_child_policy_); } else if (!CalledByCurrentChild()) { // This request is from an outdated child, so ignore it. return; } parent()->channel_control_helper()->UpdateState(state, status, std::move(picker)); } void RequestReresolution() override { if (parent()->shutting_down_) return; // Only forward re-resolution requests from the most recent child, // since that's the one that will be receiving any update we receive // from the resolver. const LoadBalancingPolicy* latest_child_policy = parent()->pending_child_policy_ != nullptr ? parent()->pending_child_policy_.get() : parent()->child_policy_.get(); if (child_ != latest_child_policy) return; if (GRPC_TRACE_FLAG_ENABLED(*(parent()->tracer_))) { gpr_log(GPR_INFO, "[child_policy_handler %p] requesting re-resolution", parent()); } parent()->channel_control_helper()->RequestReresolution(); } void AddTraceEvent(TraceSeverity severity, absl::string_view message) override { if (parent()->shutting_down_) return; if (!CalledByPendingChild() && !CalledByCurrentChild()) return; parent()->channel_control_helper()->AddTraceEvent(severity, message); } void set_child(LoadBalancingPolicy* child) { child_ = child; } private: bool CalledByPendingChild() const { CHECK_NE(child_, nullptr); return child_ == parent()->pending_child_policy_.get(); } bool CalledByCurrentChild() const { CHECK_NE(child_, nullptr); return child_ == parent()->child_policy_.get(); }; LoadBalancingPolicy* child_ = nullptr; }; // // ChildPolicyHandler // void ChildPolicyHandler::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down", this); } shutting_down_ = true; if (child_policy_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down lb_policy %p", this, child_policy_.get()); } grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), interested_parties()); child_policy_.reset(); } if (pending_child_policy_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] shutting down pending lb_policy %p", this, pending_child_policy_.get()); } grpc_pollset_set_del_pollset_set( pending_child_policy_->interested_parties(), interested_parties()); pending_child_policy_.reset(); } } absl::Status ChildPolicyHandler::UpdateLocked(UpdateArgs args) { // If the child policy name changes, we need to create a new child // policy. When this happens, we leave child_policy_ as-is and store // the new child policy in pending_child_policy_. Once the new child // policy transitions into state READY, we swap it into child_policy_, // replacing the original child policy. So pending_child_policy_ is // non-null only between when we apply an update that changes the child // policy name and when the new child reports state READY. // // Updates can arrive at any point during this transition. We always // apply updates relative to the most recently created child policy, // even if the most recent one is still in pending_child_policy_. This // is true both when applying the updates to an existing child policy // and when determining whether we need to create a new policy. // // As a result of this, there are several cases to consider here: // // 1. We have no existing child policy (i.e., this is the first update // we receive after being created; in this case, both child_policy_ // and pending_child_policy_ are null). In this case, we create a // new child policy and store it in child_policy_. // // 2. We have an existing child policy and have no pending child policy // from a previous update (i.e., either there has not been a // previous update that changed the policy name, or we have already // finished swapping in the new policy; in this case, child_policy_ // is non-null but pending_child_policy_ is null). In this case: // a. If going from the current config to the new config does not // require a new policy, then we update the existing child policy. // b. If going from the current config to the new config does require a // new policy, we create a new policy. The policy will be stored in // pending_child_policy_ and will later be swapped into // child_policy_ by the helper when the new child transitions // into state READY. // // 3. We have an existing child policy and have a pending child policy // from a previous update (i.e., a previous update set // pending_child_policy_ as per case 2b above and that policy has // not yet transitioned into state READY and been swapped into // child_policy_; in this case, both child_policy_ and // pending_child_policy_ are non-null). In this case: // a. If going from the current config to the new config does not // require a new policy, then we update the existing pending // child policy. // b. If going from the current config to the new config does require a // new child policy, then we create a new policy. The new // policy is stored in pending_child_policy_ (replacing the one // that was there before, which will be immediately shut down) // and will later be swapped into child_policy_ by the helper // when the new child transitions into state READY. const bool create_policy = // case 1 child_policy_ == nullptr || // cases 2b and 3b ConfigChangeRequiresNewPolicyInstance(current_config_.get(), args.config.get()); current_config_ = args.config; LoadBalancingPolicy* policy_to_update = nullptr; if (create_policy) { // Cases 1, 2b, and 3b: create a new child policy. // If child_policy_ is null, we set it (case 1), else we set // pending_child_policy_ (cases 2b and 3b). // TODO(roth): In cases 2b and 3b, we should start a timer here, so // that there's an upper bound on the amount of time it takes us to // switch to the new policy, even if the new policy stays in // CONNECTING for a very long period of time. if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] creating new %schild policy %s", this, child_policy_ == nullptr ? "" : "pending ", std::string(args.config->name()).c_str()); } auto& lb_policy = child_policy_ == nullptr ? child_policy_ : pending_child_policy_; lb_policy = CreateChildPolicy(args.config->name(), args.args); policy_to_update = lb_policy.get(); } else { // Cases 2a and 3a: update an existing policy. // If we have a pending child policy, send the update to the pending // policy (case 3a), else send it to the current policy (case 2a). policy_to_update = pending_child_policy_ != nullptr ? pending_child_policy_.get() : child_policy_.get(); } CHECK_NE(policy_to_update, nullptr); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] updating %schild policy %p", this, policy_to_update == pending_child_policy_.get() ? "pending " : "", policy_to_update); } return policy_to_update->UpdateLocked(std::move(args)); } void ChildPolicyHandler::ExitIdleLocked() { if (child_policy_ != nullptr) { child_policy_->ExitIdleLocked(); if (pending_child_policy_ != nullptr) { pending_child_policy_->ExitIdleLocked(); } } } void ChildPolicyHandler::ResetBackoffLocked() { if (child_policy_ != nullptr) { child_policy_->ResetBackoffLocked(); if (pending_child_policy_ != nullptr) { pending_child_policy_->ResetBackoffLocked(); } } } OrphanablePtr ChildPolicyHandler::CreateChildPolicy( absl::string_view child_policy_name, const ChannelArgs& args) { Helper* helper = new Helper(RefAsSubclass(DEBUG_LOCATION, "Helper")); LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = work_serializer(); lb_policy_args.channel_control_helper = std::unique_ptr(helper); lb_policy_args.args = args; OrphanablePtr lb_policy = CreateLoadBalancingPolicy(child_policy_name, std::move(lb_policy_args)); if (GPR_UNLIKELY(lb_policy == nullptr)) { gpr_log(GPR_ERROR, "could not create LB policy \"%s\"", std::string(child_policy_name).c_str()); return nullptr; } helper->set_child(lb_policy.get()); if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[child_policy_handler %p] created new LB policy \"%s\" (%p)", this, std::string(child_policy_name).c_str(), lb_policy.get()); } channel_control_helper()->AddTraceEvent( ChannelControlHelper::TRACE_INFO, absl::StrCat("Created new LB policy \"", child_policy_name, "\"")); grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), interested_parties()); return lb_policy; } bool ChildPolicyHandler::ConfigChangeRequiresNewPolicyInstance( LoadBalancingPolicy::Config* old_config, LoadBalancingPolicy::Config* new_config) const { return old_config->name() != new_config->name(); } OrphanablePtr ChildPolicyHandler::CreateLoadBalancingPolicy( absl::string_view name, LoadBalancingPolicy::Args args) const { return CoreConfiguration::Get() .lb_policy_registry() .CreateLoadBalancingPolicy(name, std::move(args)); } } // namespace grpc_core