// // Copyright 2018 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include #include #include "absl/strings/str_cat.h" #include "absl/strings/str_format.h" #include #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/lb_policy/address_filtering.h" #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/ext/filters/client_channel/lb_policy_factory.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/timer.h" #include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/transport/error_utils.h" namespace grpc_core { TraceFlag grpc_lb_priority_trace(false, "priority_lb"); namespace { constexpr char kPriority[] = "priority_experimental"; // How long we keep a child around for after it is no longer being used // (either because it has been removed from the config or because we // have switched to a higher-priority child). constexpr int kChildRetentionIntervalMs = 15 * 60 * 1000; // Default for how long we wait for a newly created child to get connected // before starting to attempt the next priority. Overridable via channel arg. constexpr int kDefaultChildFailoverTimeoutMs = 10000; // Config for priority LB policy. class PriorityLbConfig : public LoadBalancingPolicy::Config { public: struct PriorityLbChild { RefCountedPtr config; bool ignore_reresolution_requests = false; }; PriorityLbConfig(std::map children, std::vector priorities) : children_(std::move(children)), priorities_(std::move(priorities)) {} const char* name() const override { return kPriority; } const std::map& children() const { return children_; } const std::vector& priorities() const { return priorities_; } private: const std::map children_; const std::vector priorities_; }; // priority LB policy. class PriorityLb : public LoadBalancingPolicy { public: explicit PriorityLb(Args args); const char* name() const override { return kPriority; } void UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; private: // Each ChildPriority holds a ref to the PriorityLb. class ChildPriority : public InternallyRefCounted { public: ChildPriority(RefCountedPtr priority_policy, std::string name); ~ChildPriority() override { priority_policy_.reset(DEBUG_LOCATION, "ChildPriority"); } const std::string& name() const { return name_; } void UpdateLocked(RefCountedPtr config, bool ignore_reresolution_requests); void ExitIdleLocked(); void ResetBackoffLocked(); void DeactivateLocked(); void MaybeReactivateLocked(); void MaybeCancelFailoverTimerLocked(); void Orphan() override; std::unique_ptr GetPicker() { return absl::make_unique(picker_wrapper_); } grpc_connectivity_state connectivity_state() const { return connectivity_state_; } const absl::Status& connectivity_status() const { return connectivity_status_; } bool failover_timer_callback_pending() const { return failover_timer_callback_pending_; } private: // A simple wrapper for ref-counting a picker from the child policy. class RefCountedPicker : public RefCounted { public: explicit RefCountedPicker(std::unique_ptr picker) : picker_(std::move(picker)) {} PickResult Pick(PickArgs args) { return picker_->Pick(args); } private: std::unique_ptr picker_; }; // A non-ref-counted wrapper for RefCountedPicker. class RefCountedPickerWrapper : public SubchannelPicker { public: explicit RefCountedPickerWrapper(RefCountedPtr picker) : picker_(std::move(picker)) {} PickResult Pick(PickArgs args) override { return picker_->Pick(args); } private: RefCountedPtr picker_; }; class Helper : public ChannelControlHelper { public: explicit Helper(RefCountedPtr priority) : priority_(std::move(priority)) {} ~Helper() override { priority_.reset(DEBUG_LOCATION, "Helper"); } RefCountedPtr CreateSubchannel( ServerAddress address, const grpc_channel_args& args) override; void UpdateState(grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) override; void RequestReresolution() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; private: RefCountedPtr priority_; }; // Methods for dealing with the child policy. OrphanablePtr CreateChildPolicyLocked( const grpc_channel_args* args); void OnConnectivityStateUpdateLocked( grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker); void StartFailoverTimerLocked(); static void OnFailoverTimer(void* arg, grpc_error* error); void OnFailoverTimerLocked(grpc_error* error); static void OnDeactivationTimer(void* arg, grpc_error* error); void OnDeactivationTimerLocked(grpc_error* error); RefCountedPtr priority_policy_; const std::string name_; bool ignore_reresolution_requests_ = false; OrphanablePtr child_policy_; grpc_connectivity_state connectivity_state_ = GRPC_CHANNEL_CONNECTING; absl::Status connectivity_status_; RefCountedPtr picker_wrapper_; // States for delayed removal. grpc_timer deactivation_timer_; grpc_closure on_deactivation_timer_; bool deactivation_timer_callback_pending_ = false; // States of failover. grpc_timer failover_timer_; grpc_closure on_failover_timer_; bool failover_timer_callback_pending_ = false; }; ~PriorityLb() override; void ShutdownLocked() override; // Returns UINT32_MAX if child is not in current priority list. uint32_t GetChildPriorityLocked(const std::string& child_name) const; void HandleChildConnectivityStateChangeLocked(ChildPriority* child); void DeleteChild(ChildPriority* child); void TryNextPriorityLocked(bool report_connecting); void SelectPriorityLocked(uint32_t priority); const int child_failover_timeout_ms_; // Current channel args and config from the resolver. const grpc_channel_args* args_ = nullptr; RefCountedPtr config_; HierarchicalAddressMap addresses_; // Internal state. bool shutting_down_ = false; std::map> children_; // The priority that is being used. uint32_t current_priority_ = UINT32_MAX; // Points to the current child from before the most recent update. // We will continue to use this child until we decide which of the new // children to use. ChildPriority* current_child_from_before_update_ = nullptr; }; // // PriorityLb // PriorityLb::PriorityLb(Args args) : LoadBalancingPolicy(std::move(args)), child_failover_timeout_ms_(grpc_channel_args_find_integer( args.args, GRPC_ARG_PRIORITY_FAILOVER_TIMEOUT_MS, {kDefaultChildFailoverTimeoutMs, 0, INT_MAX})) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] created", this); } } PriorityLb::~PriorityLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] destroying priority LB policy", this); } grpc_channel_args_destroy(args_); } void PriorityLb::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] shutting down", this); } shutting_down_ = true; children_.clear(); } void PriorityLb::ExitIdleLocked() { if (current_priority_ != UINT32_MAX) { const std::string& child_name = config_->priorities()[current_priority_]; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] exiting IDLE for current priority %d child %s", this, current_priority_, child_name.c_str()); } children_[child_name]->ExitIdleLocked(); } } void PriorityLb::ResetBackoffLocked() { for (const auto& p : children_) p.second->ResetBackoffLocked(); } void PriorityLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] received update", this); } // Save current child. if (current_priority_ != UINT32_MAX) { const std::string& child_name = config_->priorities()[current_priority_]; current_child_from_before_update_ = children_[child_name].get(); // Unset current_priority_, since it was an index into the old // config's priority list and may no longer be valid. It will be // reset later by TryNextPriorityLocked(), but we unset it here in // case updating any of our children triggers a state update. current_priority_ = UINT32_MAX; } // Update config. config_ = std::move(args.config); // Update args. grpc_channel_args_destroy(args_); args_ = args.args; args.args = nullptr; // Update addresses. addresses_ = MakeHierarchicalAddressMap(args.addresses); // Check all existing children against the new config. for (const auto& p : children_) { const std::string& child_name = p.first; auto& child = p.second; auto config_it = config_->children().find(child_name); if (config_it == config_->children().end()) { // Existing child not found in new config. Deactivate it. child->DeactivateLocked(); } else { // Existing child found in new config. Update it. child->UpdateLocked(config_it->second.config, config_it->second.ignore_reresolution_requests); } } // Try to get connected. TryNextPriorityLocked(/*report_connecting=*/children_.empty()); } uint32_t PriorityLb::GetChildPriorityLocked( const std::string& child_name) const { for (uint32_t priority = 0; priority < config_->priorities().size(); ++priority) { if (config_->priorities()[priority] == child_name) return priority; } return UINT32_MAX; } void PriorityLb::HandleChildConnectivityStateChangeLocked( ChildPriority* child) { // Special case for the child that was the current child before the // most recent update. if (child == current_child_from_before_update_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] state update for current child from before " "config update", this); } if (child->connectivity_state() == GRPC_CHANNEL_READY || child->connectivity_state() == GRPC_CHANNEL_IDLE) { // If it's still READY or IDLE, we stick with this child, so pass // the new picker up to our parent. channel_control_helper()->UpdateState(child->connectivity_state(), child->connectivity_status(), child->GetPicker()); } else { // If it's no longer READY or IDLE, we should stop using it. // We already started trying other priorities as a result of the // update, but calling TryNextPriorityLocked() ensures that we will // properly select between CONNECTING and TRANSIENT_FAILURE as the // new state to report to our parent. current_child_from_before_update_ = nullptr; TryNextPriorityLocked(/*report_connecting=*/true); } return; } // Otherwise, find the child's priority. uint32_t child_priority = GetChildPriorityLocked(child->name()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] state update for priority %u, child %s, current " "priority %u", this, child_priority, child->name().c_str(), current_priority_); } // Ignore priorities not in the current config. if (child_priority == UINT32_MAX) return; // Ignore lower-than-current priorities. if (child_priority > current_priority_) return; // If a child reports TRANSIENT_FAILURE, start trying the next priority. // Note that even if this is for a higher-than-current priority, we // may still need to create some children between this priority and // the current one (e.g., if we got an update that inserted new // priorities ahead of the current one). if (child->connectivity_state() == GRPC_CHANNEL_TRANSIENT_FAILURE) { TryNextPriorityLocked( /*report_connecting=*/child_priority == current_priority_); return; } // The update is for a higher-than-current priority (or for any // priority if we don't have any current priority). if (child_priority < current_priority_) { // If the child reports READY or IDLE, switch to that priority. // Otherwise, ignore the update. if (child->connectivity_state() == GRPC_CHANNEL_READY || child->connectivity_state() == GRPC_CHANNEL_IDLE) { SelectPriorityLocked(child_priority); } return; } // The current priority has returned a new picker, so pass it up to // our parent. channel_control_helper()->UpdateState(child->connectivity_state(), child->connectivity_status(), child->GetPicker()); } void PriorityLb::DeleteChild(ChildPriority* child) { // If this was the current child from before the most recent update, // stop using it. We already started trying other priorities as a // result of the update, but calling TryNextPriorityLocked() ensures that // we will properly select between CONNECTING and TRANSIENT_FAILURE as the // new state to report to our parent. if (current_child_from_before_update_ == child) { current_child_from_before_update_ = nullptr; TryNextPriorityLocked(/*report_connecting=*/true); } children_.erase(child->name()); } void PriorityLb::TryNextPriorityLocked(bool report_connecting) { current_priority_ = UINT32_MAX; for (uint32_t priority = 0; priority < config_->priorities().size(); ++priority) { // If the child for the priority does not exist yet, create it. const std::string& child_name = config_->priorities()[priority]; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] trying priority %u, child %s", this, priority, child_name.c_str()); } auto& child = children_[child_name]; if (child == nullptr) { if (report_connecting) { channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker"))); } child = MakeOrphanable( Ref(DEBUG_LOCATION, "ChildPriority"), child_name); auto child_config = config_->children().find(child_name); GPR_DEBUG_ASSERT(child_config != config_->children().end()); child->UpdateLocked(child_config->second.config, child_config->second.ignore_reresolution_requests); return; } // The child already exists. child->MaybeReactivateLocked(); // If the child is in state READY or IDLE, switch to it. if (child->connectivity_state() == GRPC_CHANNEL_READY || child->connectivity_state() == GRPC_CHANNEL_IDLE) { SelectPriorityLocked(priority); return; } // Child is not READY or IDLE. // If its failover timer is still pending, give it time to fire. if (child->failover_timer_callback_pending()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] priority %u, child %s: child still " "attempting to connect, will wait", this, priority, child_name.c_str()); } if (report_connecting) { channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), absl::make_unique(Ref(DEBUG_LOCATION, "QueuePicker"))); } return; } // Child has been failing for a while. Move on to the next priority. } // If there are no more priorities to try, report TRANSIENT_FAILURE. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] no priority reachable, putting channel in " "TRANSIENT_FAILURE", this); } current_child_from_before_update_ = nullptr; grpc_error* error = grpc_error_set_int( GRPC_ERROR_CREATE_FROM_STATIC_STRING("no ready priority"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error), absl::make_unique(error)); } void PriorityLb::SelectPriorityLocked(uint32_t priority) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] selected priority %u, child %s", this, priority, config_->priorities()[priority].c_str()); } current_priority_ = priority; current_child_from_before_update_ = nullptr; // Deactivate lower priorities. for (uint32_t p = priority + 1; p < config_->priorities().size(); ++p) { const std::string& child_name = config_->priorities()[p]; auto it = children_.find(child_name); if (it != children_.end()) it->second->DeactivateLocked(); } // Update picker. auto& child = children_[config_->priorities()[priority]]; channel_control_helper()->UpdateState(child->connectivity_state(), child->connectivity_status(), child->GetPicker()); } // // PriorityLb::ChildPriority // PriorityLb::ChildPriority::ChildPriority( RefCountedPtr priority_policy, std::string name) : priority_policy_(std::move(priority_policy)), name_(std::move(name)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] creating child %s (%p)", priority_policy_.get(), name_.c_str(), this); } GRPC_CLOSURE_INIT(&on_failover_timer_, OnFailoverTimer, this, grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&on_deactivation_timer_, OnDeactivationTimer, this, grpc_schedule_on_exec_ctx); // Start the failover timer. StartFailoverTimerLocked(); } void PriorityLb::ChildPriority::Orphan() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): orphaned", priority_policy_.get(), name_.c_str(), this); } MaybeCancelFailoverTimerLocked(); if (deactivation_timer_callback_pending_) { grpc_timer_cancel(&deactivation_timer_); } // Remove the child policy's interested_parties pollset_set from the // xDS policy. grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), priority_policy_->interested_parties()); child_policy_.reset(); // Drop our ref to the child's picker, in case it's holding a ref to // the child. picker_wrapper_.reset(); if (deactivation_timer_callback_pending_) { grpc_timer_cancel(&deactivation_timer_); } Unref(DEBUG_LOCATION, "ChildPriority+Orphan"); } void PriorityLb::ChildPriority::UpdateLocked( RefCountedPtr config, bool ignore_reresolution_requests) { if (priority_policy_->shutting_down_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): start update", priority_policy_.get(), name_.c_str(), this); } ignore_reresolution_requests_ = ignore_reresolution_requests; // Create policy if needed. if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(priority_policy_->args_); } // Construct update args. UpdateArgs update_args; update_args.config = std::move(config); update_args.addresses = priority_policy_->addresses_[name_]; update_args.args = grpc_channel_args_copy(priority_policy_->args_); // Update the policy. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): updating child policy handler %p", priority_policy_.get(), name_.c_str(), this, child_policy_.get()); } child_policy_->UpdateLocked(std::move(update_args)); } OrphanablePtr PriorityLb::ChildPriority::CreateChildPolicyLocked( const grpc_channel_args* args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = priority_policy_->work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = absl::make_unique(this->Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = MakeOrphanable(std::move(lb_policy_args), &grpc_lb_priority_trace); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): created new child policy " "handler %p", priority_policy_.get(), name_.c_str(), this, lb_policy.get()); } // Add the parent's interested_parties pollset_set to that of the newly // created child policy. This will make the child policy progress upon // activity on the parent LB, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), priority_policy_->interested_parties()); return lb_policy; } void PriorityLb::ChildPriority::ExitIdleLocked() { if (connectivity_state_ == GRPC_CHANNEL_IDLE && !failover_timer_callback_pending_) { StartFailoverTimerLocked(); } child_policy_->ExitIdleLocked(); } void PriorityLb::ChildPriority::ResetBackoffLocked() { child_policy_->ResetBackoffLocked(); } void PriorityLb::ChildPriority::OnConnectivityStateUpdateLocked( grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): state update: %s (%s) picker %p", priority_policy_.get(), name_.c_str(), this, ConnectivityStateName(state), status.ToString().c_str(), picker.get()); } // Store the state and picker. connectivity_state_ = state; connectivity_status_ = status; picker_wrapper_ = MakeRefCounted(std::move(picker)); // If READY or TRANSIENT_FAILURE, cancel failover timer. if (state == GRPC_CHANNEL_READY || state == GRPC_CHANNEL_TRANSIENT_FAILURE) { MaybeCancelFailoverTimerLocked(); } // Notify the parent policy. priority_policy_->HandleChildConnectivityStateChangeLocked(this); } void PriorityLb::ChildPriority::StartFailoverTimerLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): starting failover timer for %d ms", priority_policy_.get(), name_.c_str(), this, priority_policy_->child_failover_timeout_ms_); } Ref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked").release(); grpc_timer_init( &failover_timer_, ExecCtx::Get()->Now() + priority_policy_->child_failover_timeout_ms_, &on_failover_timer_); failover_timer_callback_pending_ = true; } void PriorityLb::ChildPriority::MaybeCancelFailoverTimerLocked() { if (failover_timer_callback_pending_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): cancelling failover timer", priority_policy_.get(), name_.c_str(), this); } grpc_timer_cancel(&failover_timer_); failover_timer_callback_pending_ = false; } } void PriorityLb::ChildPriority::OnFailoverTimer(void* arg, grpc_error* error) { ChildPriority* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda self->priority_policy_->work_serializer()->Run( [self, error]() { self->OnFailoverTimerLocked(error); }, DEBUG_LOCATION); } void PriorityLb::ChildPriority::OnFailoverTimerLocked(grpc_error* error) { if (error == GRPC_ERROR_NONE && failover_timer_callback_pending_ && !priority_policy_->shutting_down_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): failover timer fired, " "reporting TRANSIENT_FAILURE", priority_policy_.get(), name_.c_str(), this); } failover_timer_callback_pending_ = false; OnConnectivityStateUpdateLocked( GRPC_CHANNEL_TRANSIENT_FAILURE, absl::Status(absl::StatusCode::kUnavailable, "failover timer fired"), nullptr); } Unref(DEBUG_LOCATION, "ChildPriority+OnFailoverTimerLocked"); GRPC_ERROR_UNREF(error); } void PriorityLb::ChildPriority::DeactivateLocked() { // If already deactivated, don't do it again. if (deactivation_timer_callback_pending_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): deactivating -- will remove in %d " "ms.", priority_policy_.get(), name_.c_str(), this, kChildRetentionIntervalMs); } MaybeCancelFailoverTimerLocked(); // Start a timer to delete the child. Ref(DEBUG_LOCATION, "ChildPriority+timer").release(); grpc_timer_init(&deactivation_timer_, ExecCtx::Get()->Now() + kChildRetentionIntervalMs, &on_deactivation_timer_); deactivation_timer_callback_pending_ = true; } void PriorityLb::ChildPriority::MaybeReactivateLocked() { if (deactivation_timer_callback_pending_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): reactivating", priority_policy_.get(), name_.c_str(), this); } deactivation_timer_callback_pending_ = false; grpc_timer_cancel(&deactivation_timer_); } } void PriorityLb::ChildPriority::OnDeactivationTimer(void* arg, grpc_error* error) { ChildPriority* self = static_cast(arg); GRPC_ERROR_REF(error); // ref owned by lambda self->priority_policy_->work_serializer()->Run( [self, error]() { self->OnDeactivationTimerLocked(error); }, DEBUG_LOCATION); } void PriorityLb::ChildPriority::OnDeactivationTimerLocked(grpc_error* error) { if (error == GRPC_ERROR_NONE && deactivation_timer_callback_pending_ && !priority_policy_->shutting_down_) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_priority_trace)) { gpr_log(GPR_INFO, "[priority_lb %p] child %s (%p): deactivation timer fired, " "deleting child", priority_policy_.get(), name_.c_str(), this); } deactivation_timer_callback_pending_ = false; priority_policy_->DeleteChild(this); } Unref(DEBUG_LOCATION, "ChildPriority+timer"); GRPC_ERROR_UNREF(error); } // // PriorityLb::ChildPriority::Helper // void PriorityLb::ChildPriority::Helper::RequestReresolution() { if (priority_->priority_policy_->shutting_down_) return; if (priority_->ignore_reresolution_requests_) { return; } priority_->priority_policy_->channel_control_helper()->RequestReresolution(); } RefCountedPtr PriorityLb::ChildPriority::Helper::CreateSubchannel( ServerAddress address, const grpc_channel_args& args) { if (priority_->priority_policy_->shutting_down_) return nullptr; return priority_->priority_policy_->channel_control_helper() ->CreateSubchannel(std::move(address), args); } void PriorityLb::ChildPriority::Helper::UpdateState( grpc_connectivity_state state, const absl::Status& status, std::unique_ptr picker) { if (priority_->priority_policy_->shutting_down_) return; // Notify the priority. priority_->OnConnectivityStateUpdateLocked(state, status, std::move(picker)); } void PriorityLb::ChildPriority::Helper::AddTraceEvent( TraceSeverity severity, absl::string_view message) { if (priority_->priority_policy_->shutting_down_) return; priority_->priority_policy_->channel_control_helper()->AddTraceEvent(severity, message); } // // factory // class PriorityLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); } const char* name() const override { return kPriority; } RefCountedPtr ParseLoadBalancingConfig( const Json& json, grpc_error** error) const override { GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE); if (json.type() == Json::Type::JSON_NULL) { // priority was mentioned as a policy in the deprecated // loadBalancingPolicy field or in the client API. *error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:loadBalancingPolicy error:priority policy requires " "configuration. Please use loadBalancingConfig field of service " "config instead."); return nullptr; } std::vector error_list; // Children. std::map children; auto it = json.object_value().find("children"); if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:children error:required field missing")); } else if (it->second.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:children error:type should be object")); } else { const Json::Object& object = it->second.object_value(); for (const auto& p : object) { const std::string& child_name = p.first; const Json& element = p.second; if (element.type() != Json::Type::OBJECT) { error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:children key:", child_name, " error:should be type object") .c_str())); } else { auto it2 = element.object_value().find("config"); if (it2 == element.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:children key:", child_name, " error:missing 'config' field") .c_str())); } else { grpc_error* parse_error = GRPC_ERROR_NONE; auto config = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig( it2->second, &parse_error); bool ignore_resolution_requests = false; // If present, ignore_reresolution_requests must be of type // boolean. auto it3 = element.object_value().find("ignore_reresolution_requests"); if (it3 != element.object_value().end()) { if (it3->second.type() == Json::Type::JSON_TRUE) { ignore_resolution_requests = true; } else if (it3->second.type() != Json::Type::JSON_FALSE) { error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:children key:", child_name, " field:ignore_reresolution_requests:should " "be type boolean") .c_str())); } } if (config == nullptr) { GPR_DEBUG_ASSERT(parse_error != GRPC_ERROR_NONE); error_list.push_back( GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING( absl::StrCat("field:children key:", child_name).c_str(), &parse_error, 1)); GRPC_ERROR_UNREF(parse_error); } children[child_name].config = std::move(config); children[child_name].ignore_reresolution_requests = ignore_resolution_requests; } } } } // Priorities. std::vector priorities; it = json.object_value().find("priorities"); if (it == json.object_value().end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:priorities error:required field missing")); } else if (it->second.type() != Json::Type::ARRAY) { error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "field:priorities error:type should be array")); } else { const Json::Array& array = it->second.array_value(); for (size_t i = 0; i < array.size(); ++i) { const Json& element = array[i]; if (element.type() != Json::Type::STRING) { error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:priorities element:", i, " error:should be type string") .c_str())); } else if (children.find(element.string_value()) == children.end()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:priorities element:", i, " error:unknown child '", element.string_value(), "'") .c_str())); } else { priorities.emplace_back(element.string_value()); } } if (priorities.size() != children.size()) { error_list.push_back(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("field:priorities error:priorities size (", priorities.size(), ") != children size (", children.size(), ")") .c_str())); } } if (error_list.empty()) { return MakeRefCounted(std::move(children), std::move(priorities)); } else { *error = GRPC_ERROR_CREATE_FROM_VECTOR( "priority_experimental LB policy config", &error_list); return nullptr; } } }; } // namespace } // namespace grpc_core // // Plugin registration // void grpc_lb_policy_priority_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( absl::make_unique()); } void grpc_lb_policy_priority_shutdown() {}