// // Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "absl/base/thread_annotations.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_cat.h" #include "absl/strings/str_join.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include #include #include #include "src/core/ext/filters/client_channel/lb_policy/backend_metric_data.h" #include "src/core/ext/filters/client_channel/lb_policy/oob_backend_metric.h" #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/static_stride_scheduler.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/resolved_address.h" #include "src/core/lib/json/json.h" #include "src/core/lib/json/json_args.h" #include "src/core/lib/json/json_object_loader.h" #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy_factory.h" #include "src/core/lib/load_balancing/subchannel_interface.h" #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" namespace grpc_core { TraceFlag grpc_lb_wrr_trace(false, "weighted_round_robin_lb"); namespace { constexpr absl::string_view kWeightedRoundRobin = "weighted_round_robin"; // Config for WRR policy. class WeightedRoundRobinConfig : public LoadBalancingPolicy::Config { public: WeightedRoundRobinConfig() = default; WeightedRoundRobinConfig(const WeightedRoundRobinConfig&) = delete; WeightedRoundRobinConfig& operator=(const WeightedRoundRobinConfig&) = delete; WeightedRoundRobinConfig(WeightedRoundRobinConfig&&) = delete; WeightedRoundRobinConfig& operator=(WeightedRoundRobinConfig&&) = delete; absl::string_view name() const override { return kWeightedRoundRobin; } bool enable_oob_load_report() const { return enable_oob_load_report_; } Duration oob_reporting_period() const { return oob_reporting_period_; } Duration blackout_period() const { return blackout_period_; } Duration weight_update_period() const { return weight_update_period_; } Duration weight_expiration_period() const { return weight_expiration_period_; } float error_utilization_penalty() const { return error_utilization_penalty_; } static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .OptionalField("enableOobLoadReport", &WeightedRoundRobinConfig::enable_oob_load_report_) .OptionalField("oobReportingPeriod", &WeightedRoundRobinConfig::oob_reporting_period_) .OptionalField("blackoutPeriod", &WeightedRoundRobinConfig::blackout_period_) .OptionalField("weightUpdatePeriod", &WeightedRoundRobinConfig::weight_update_period_) .OptionalField("weightExpirationPeriod", &WeightedRoundRobinConfig::weight_expiration_period_) .OptionalField( "errorUtilizationPenalty", &WeightedRoundRobinConfig::error_utilization_penalty_) .Finish(); return loader; } void JsonPostLoad(const Json&, const JsonArgs&, ValidationErrors* errors) { // Impose lower bound of 100ms on weightUpdatePeriod. weight_update_period_ = std::max(weight_update_period_, Duration::Milliseconds(100)); if (error_utilization_penalty_ < 0) { ValidationErrors::ScopedField field(errors, ".errorUtilizationPenalty"); errors->AddError("must be non-negative"); } } private: bool enable_oob_load_report_ = false; Duration oob_reporting_period_ = Duration::Seconds(10); Duration blackout_period_ = Duration::Seconds(10); Duration weight_update_period_ = Duration::Seconds(1); Duration weight_expiration_period_ = Duration::Minutes(3); float error_utilization_penalty_ = 1.0; }; // WRR LB policy. class WeightedRoundRobin : public LoadBalancingPolicy { public: explicit WeightedRoundRobin(Args args); absl::string_view name() const override { return kWeightedRoundRobin; } absl::Status UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; private: // Represents the weight for a given address. class AddressWeight : public RefCounted { public: AddressWeight(RefCountedPtr wrr, std::string key) : wrr_(std::move(wrr)), key_(std::move(key)) {} ~AddressWeight() override; void MaybeUpdateWeight(double qps, double eps, double utilization, float error_utilization_penalty); float GetWeight(Timestamp now, Duration weight_expiration_period, Duration blackout_period); void ResetNonEmptySince(); private: RefCountedPtr wrr_; const std::string key_; Mutex mu_; float weight_ ABSL_GUARDED_BY(&mu_) = 0; Timestamp non_empty_since_ ABSL_GUARDED_BY(&mu_) = Timestamp::InfFuture(); Timestamp last_update_time_ ABSL_GUARDED_BY(&mu_) = Timestamp::InfPast(); }; // Forward declaration. class WeightedRoundRobinSubchannelList; // Data for a particular subchannel in a subchannel list. // This subclass adds the following functionality: // - Tracks the previous connectivity state of the subchannel, so that // we know how many subchannels are in each state. class WeightedRoundRobinSubchannelData : public SubchannelData { public: WeightedRoundRobinSubchannelData( SubchannelList* subchannel_list, const ServerAddress& address, RefCountedPtr sc); absl::optional connectivity_state() const { return logical_connectivity_state_; } RefCountedPtr weight() const { return weight_; } private: class OobWatcher : public OobBackendMetricWatcher { public: OobWatcher(RefCountedPtr weight, float error_utilization_penalty) : weight_(std::move(weight)), error_utilization_penalty_(error_utilization_penalty) {} void OnBackendMetricReport( const BackendMetricData& backend_metric_data) override; private: RefCountedPtr weight_; const float error_utilization_penalty_; }; // Performs connectivity state updates that need to be done only // after we have started watching. void ProcessConnectivityChangeLocked( absl::optional old_state, grpc_connectivity_state new_state) override; // Updates the logical connectivity state. void UpdateLogicalConnectivityStateLocked( grpc_connectivity_state connectivity_state); // The logical connectivity state of the subchannel. // Note that the logical connectivity state may differ from the // actual reported state in some cases (e.g., after we see // TRANSIENT_FAILURE, we ignore any subsequent state changes until // we see READY). absl::optional logical_connectivity_state_; RefCountedPtr weight_; }; // A list of subchannels. class WeightedRoundRobinSubchannelList : public SubchannelList { public: WeightedRoundRobinSubchannelList(WeightedRoundRobin* policy, ServerAddressList addresses, const ChannelArgs& args) : SubchannelList(policy, (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) ? "WeightedRoundRobinSubchannelList" : nullptr), std::move(addresses), policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' // pollset_sets will include the LB policy's pollset_set. policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); } ~WeightedRoundRobinSubchannelList() override { WeightedRoundRobin* p = static_cast(policy()); p->Unref(DEBUG_LOCATION, "subchannel_list"); } // Updates the counters of subchannels in each state when a // subchannel transitions from old_state to new_state. void UpdateStateCountersLocked( absl::optional old_state, grpc_connectivity_state new_state); // Ensures that the right subchannel list is used and then updates // the aggregated connectivity state based on the subchannel list's // state counters. void MaybeUpdateAggregatedConnectivityStateLocked( absl::Status status_for_tf); private: std::shared_ptr work_serializer() const override { return static_cast(policy())->work_serializer(); } std::string CountersString() const { return absl::StrCat("num_subchannels=", num_subchannels(), " num_ready=", num_ready_, " num_connecting=", num_connecting_, " num_transient_failure=", num_transient_failure_); } size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; absl::Status last_failure_; }; // A picker that performs WRR picks with weights based on // endpoint-reported utilization and QPS. class Picker : public SubchannelPicker { public: Picker(RefCountedPtr wrr, WeightedRoundRobinSubchannelList* subchannel_list); ~Picker() override; PickResult Pick(PickArgs args) override; void Orphan() override; private: // A call tracker that collects per-call endpoint utilization reports. class SubchannelCallTracker : public SubchannelCallTrackerInterface { public: SubchannelCallTracker(RefCountedPtr weight, float error_utilization_penalty) : weight_(std::move(weight)), error_utilization_penalty_(error_utilization_penalty) {} void Start() override {} void Finish(FinishArgs args) override; private: RefCountedPtr weight_; const float error_utilization_penalty_; }; // Info stored about each subchannel. struct SubchannelInfo { SubchannelInfo(RefCountedPtr subchannel, RefCountedPtr weight) : subchannel(std::move(subchannel)), weight(std::move(weight)) {} RefCountedPtr subchannel; RefCountedPtr weight; }; // Returns the index into subchannels_ to be picked. size_t PickIndex(); // Builds a new scheduler and swaps it into place, then starts a // timer for the next update. void BuildSchedulerAndStartTimerLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&timer_mu_); RefCountedPtr wrr_; RefCountedPtr config_; std::vector subchannels_; Mutex scheduler_mu_; std::shared_ptr scheduler_ ABSL_GUARDED_BY(&scheduler_mu_); Mutex timer_mu_ ABSL_ACQUIRED_BEFORE(&scheduler_mu_); absl::optional timer_handle_ ABSL_GUARDED_BY(&timer_mu_); // Used when falling back to RR. std::atomic last_picked_index_; }; ~WeightedRoundRobin() override; void ShutdownLocked() override; RefCountedPtr GetOrCreateWeight( const grpc_resolved_address& address); RefCountedPtr config_; // List of subchannels. RefCountedPtr subchannel_list_; // Latest pending subchannel list. // When we get an updated address list, we create a new subchannel list // for it here, and we wait to swap it into subchannel_list_ until the new // list becomes READY. RefCountedPtr latest_pending_subchannel_list_; Mutex address_weight_map_mu_; std::map> address_weight_map_ ABSL_GUARDED_BY(&address_weight_map_mu_); bool shutdown_ = false; absl::BitGen bit_gen_; // Accessed by picker. std::atomic scheduler_state_{absl::Uniform(bit_gen_)}; }; // // WeightedRoundRobin::AddressWeight // WeightedRoundRobin::AddressWeight::~AddressWeight() { MutexLock lock(&wrr_->address_weight_map_mu_); auto it = wrr_->address_weight_map_.find(key_); if (it != wrr_->address_weight_map_.end() && it->second == this) { wrr_->address_weight_map_.erase(it); } } void WeightedRoundRobin::AddressWeight::MaybeUpdateWeight( double qps, double eps, double utilization, float error_utilization_penalty) { // Compute weight. float weight = 0; if (qps > 0 && utilization > 0) { double penalty = 0.0; if (eps > 0 && error_utilization_penalty > 0) { penalty = eps / qps * error_utilization_penalty; } weight = qps / (utilization + penalty); } if (weight == 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f: " "error_util_penalty=%f, weight=%f (not updating)", wrr_.get(), key_.c_str(), qps, eps, utilization, error_utilization_penalty, weight); } return; } Timestamp now = Timestamp::Now(); // Grab the lock and update the data. MutexLock lock(&mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f " "error_util_penalty=%f : setting weight=%f weight_=%f now=%s " "last_update_time_=%s non_empty_since_=%s", wrr_.get(), key_.c_str(), qps, eps, utilization, error_utilization_penalty, weight, weight_, now.ToString().c_str(), last_update_time_.ToString().c_str(), non_empty_since_.ToString().c_str()); } if (non_empty_since_ == Timestamp::InfFuture()) non_empty_since_ = now; weight_ = weight; last_update_time_ = now; } float WeightedRoundRobin::AddressWeight::GetWeight( Timestamp now, Duration weight_expiration_period, Duration blackout_period) { MutexLock lock(&mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] subchannel %s: getting weight: now=%s " "weight_expiration_period=%s blackout_period=%s " "last_update_time_=%s non_empty_since_=%s weight_=%f", wrr_.get(), key_.c_str(), now.ToString().c_str(), weight_expiration_period.ToString().c_str(), blackout_period.ToString().c_str(), last_update_time_.ToString().c_str(), non_empty_since_.ToString().c_str(), weight_); } // If the most recent update was longer ago than the expiration // period, reset non_empty_since_ so that we apply the blackout period // again if we start getting data again in the future, and return 0. if (now - last_update_time_ >= weight_expiration_period) { non_empty_since_ = Timestamp::InfFuture(); return 0; } // If we don't have at least blackout_period worth of data, return 0. if (blackout_period > Duration::Zero() && now - non_empty_since_ < blackout_period) { return 0; } // Otherwise, return the weight. return weight_; } void WeightedRoundRobin::AddressWeight::ResetNonEmptySince() { MutexLock lock(&mu_); non_empty_since_ = Timestamp::InfFuture(); } // // WeightedRoundRobin::Picker::SubchannelCallTracker // void WeightedRoundRobin::Picker::SubchannelCallTracker::Finish( FinishArgs args) { auto* backend_metric_data = args.backend_metric_accessor->GetBackendMetricData(); double qps = 0; double eps = 0; double utilization = 0; if (backend_metric_data != nullptr) { qps = backend_metric_data->qps; eps = backend_metric_data->eps; utilization = backend_metric_data->application_utilization; if (utilization <= 0) { utilization = backend_metric_data->cpu_utilization; } } weight_->MaybeUpdateWeight(qps, eps, utilization, error_utilization_penalty_); } // // WeightedRoundRobin::Picker // WeightedRoundRobin::Picker::Picker( RefCountedPtr wrr, WeightedRoundRobinSubchannelList* subchannel_list) : wrr_(std::move(wrr)), config_(wrr_->config_), last_picked_index_(absl::Uniform(wrr_->bit_gen_)) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { WeightedRoundRobinSubchannelData* sd = subchannel_list->subchannel(i); if (sd->connectivity_state() == GRPC_CHANNEL_READY) { subchannels_.emplace_back(sd->subchannel()->Ref(), sd->weight()); } } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] created picker from subchannel_list=%p " "with %" PRIuPTR " subchannels", wrr_.get(), this, subchannel_list, subchannels_.size()); } BuildSchedulerAndStartTimerLocked(); } WeightedRoundRobin::Picker::~Picker() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] destroying picker", wrr_.get(), this); } } void WeightedRoundRobin::Picker::Orphan() { MutexLock lock(&timer_mu_); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] cancelling timer", wrr_.get(), this); } wrr_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_); timer_handle_.reset(); } WeightedRoundRobin::PickResult WeightedRoundRobin::Picker::Pick( PickArgs /*args*/) { size_t index = PickIndex(); GPR_ASSERT(index < subchannels_.size()); auto& subchannel_info = subchannels_[index]; // Collect per-call utilization data if needed. std::unique_ptr subchannel_call_tracker; if (!config_->enable_oob_load_report()) { subchannel_call_tracker = std::make_unique( subchannel_info.weight, config_->error_utilization_penalty()); } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] returning index %" PRIuPTR ", subchannel=%p", wrr_.get(), this, index, subchannel_info.subchannel.get()); } return PickResult::Complete(subchannel_info.subchannel, std::move(subchannel_call_tracker)); } size_t WeightedRoundRobin::Picker::PickIndex() { // Grab a ref to the scheduler. std::shared_ptr scheduler; { MutexLock lock(&scheduler_mu_); scheduler = scheduler_; } // If we have a scheduler, use it to do a WRR pick. if (scheduler != nullptr) return scheduler->Pick(); // We don't have a scheduler (i.e., either all of the weights are 0 or // there is only one subchannel), so fall back to RR. return last_picked_index_.fetch_add(1) % subchannels_.size(); } void WeightedRoundRobin::Picker::BuildSchedulerAndStartTimerLocked() { // Build scheduler. const Timestamp now = Timestamp::Now(); std::vector weights; weights.reserve(subchannels_.size()); for (const auto& subchannel : subchannels_) { weights.push_back(subchannel.weight->GetWeight( now, config_->weight_expiration_period(), config_->blackout_period())); } if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] new weights: %s", wrr_.get(), this, absl::StrJoin(weights, " ").c_str()); } auto scheduler_or = StaticStrideScheduler::Make( weights, [this]() { return wrr_->scheduler_state_.fetch_add(1); }); std::shared_ptr scheduler; if (scheduler_or.has_value()) { scheduler = std::make_shared(std::move(*scheduler_or)); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] new scheduler: %p", wrr_.get(), this, scheduler.get()); } } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] no scheduler, falling back to RR", wrr_.get(), this); } { MutexLock lock(&scheduler_mu_); scheduler_ = std::move(scheduler); } // Start timer. WeakRefCountedPtr self = WeakRef(); timer_handle_ = wrr_->channel_control_helper()->GetEventEngine()->RunAfter( config_->weight_update_period(), [self = std::move(self)]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; { MutexLock lock(&self->timer_mu_); if (self->timer_handle_.has_value()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p picker %p] timer fired", self->wrr_.get(), self.get()); } self->BuildSchedulerAndStartTimerLocked(); } } // Release ref before ExecCtx goes out of scope. self.reset(); }); } // // WeightedRoundRobin // WeightedRoundRobin::WeightedRoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] Created", this); } } WeightedRoundRobin::~WeightedRoundRobin() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] Destroying Round Robin policy", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } void WeightedRoundRobin::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] Shutting down", this); } shutdown_ = true; subchannel_list_.reset(); latest_pending_subchannel_list_.reset(); } void WeightedRoundRobin::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); if (latest_pending_subchannel_list_ != nullptr) { latest_pending_subchannel_list_->ResetBackoffLocked(); } } absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) { config_ = std::move(args.config); ServerAddressList addresses; if (args.addresses.ok()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses", this, args.addresses->size()); } // Weed out duplicate addresses. Also sort the addresses so that if // the set of the addresses don't change, their indexes in the // subchannel list don't change, since this avoids unnecessary churn // in the picker. Note that this does not ensure that if a given // address remains present that it will have the same index; if, // for example, an address at the end of the list is replaced with one // that sorts much earlier in the list, then all of the addresses in // between those two positions will have changed indexes. struct AddressLessThan { bool operator()(const ServerAddress& address1, const ServerAddress& address2) const { const grpc_resolved_address& addr1 = address1.address(); const grpc_resolved_address& addr2 = address2.address(); if (addr1.len != addr2.len) return addr1.len < addr2.len; return memcmp(addr1.addr, addr2.addr, addr1.len) < 0; } }; std::set ordered_addresses( args.addresses->begin(), args.addresses->end()); addresses = ServerAddressList(ordered_addresses.begin(), ordered_addresses.end()); } else { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this, args.addresses.status().ToString().c_str()); } // If we already have a subchannel list, then keep using the existing // list, but still report back that the update was not accepted. if (subchannel_list_ != nullptr) return args.addresses.status(); } // Create new subchannel list, replacing the previous pending list, if any. if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) && latest_pending_subchannel_list_ != nullptr) { gpr_log(GPR_INFO, "[WRR %p] replacing previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } latest_pending_subchannel_list_ = MakeRefCounted( this, std::move(addresses), args.args); latest_pending_subchannel_list_->StartWatchingLocked(); // If the new list is empty, immediately promote it to // subchannel_list_ and report TRANSIENT_FAILURE. if (latest_pending_subchannel_list_->num_subchannels() == 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) && subchannel_list_ != nullptr) { gpr_log(GPR_INFO, "[WRR %p] replacing previous subchannel list %p", this, subchannel_list_.get()); } subchannel_list_ = std::move(latest_pending_subchannel_list_); absl::Status status = args.addresses.ok() ? absl::UnavailableError(absl::StrCat( "empty address list: ", args.resolution_note)) : args.addresses.status(); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, status, MakeRefCounted(status)); return status; } // Otherwise, if this is the initial update, immediately promote it to // subchannel_list_. if (subchannel_list_.get() == nullptr) { subchannel_list_ = std::move(latest_pending_subchannel_list_); } return absl::OkStatus(); } RefCountedPtr WeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) { auto key = grpc_sockaddr_to_uri(&address); if (!key.ok()) return nullptr; MutexLock lock(&address_weight_map_mu_); auto it = address_weight_map_.find(*key); if (it != address_weight_map_.end()) { auto weight = it->second->RefIfNonZero(); if (weight != nullptr) return weight; } auto weight = MakeRefCounted(Ref(DEBUG_LOCATION, "AddressWeight"), *key); address_weight_map_.emplace(*key, weight.get()); return weight; } // // WeightedRoundRobin::WeightedRoundRobinSubchannelList // void WeightedRoundRobin::WeightedRoundRobinSubchannelList:: UpdateStateCountersLocked(absl::optional old_state, grpc_connectivity_state new_state) { if (old_state.has_value()) { GPR_ASSERT(*old_state != GRPC_CHANNEL_SHUTDOWN); if (*old_state == GRPC_CHANNEL_READY) { GPR_ASSERT(num_ready_ > 0); --num_ready_; } else if (*old_state == GRPC_CHANNEL_CONNECTING) { GPR_ASSERT(num_connecting_ > 0); --num_connecting_; } else if (*old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(num_transient_failure_ > 0); --num_transient_failure_; } } GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); if (new_state == GRPC_CHANNEL_READY) { ++num_ready_; } else if (new_state == GRPC_CHANNEL_CONNECTING) { ++num_connecting_; } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++num_transient_failure_; } } void WeightedRoundRobin::WeightedRoundRobinSubchannelList:: MaybeUpdateAggregatedConnectivityStateLocked(absl::Status status_for_tf) { WeightedRoundRobin* p = static_cast(policy()); // If this is latest_pending_subchannel_list_, then swap it into // subchannel_list_ in the following cases: // - subchannel_list_ has no READY subchannels. // - This list has at least one READY subchannel and we have seen the // initial connectivity state notification for all subchannels. // - All of the subchannels in this list are in TRANSIENT_FAILURE. // (This may cause the channel to go from READY to TRANSIENT_FAILURE, // but we're doing what the control plane told us to do.) if (p->latest_pending_subchannel_list_.get() == this && (p->subchannel_list_->num_ready_ == 0 || (num_ready_ > 0 && AllSubchannelsSeenInitialState()) || num_transient_failure_ == num_subchannels())) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { const std::string old_counters_string = p->subchannel_list_ != nullptr ? p->subchannel_list_->CountersString() : ""; gpr_log( GPR_INFO, "[WRR %p] swapping out subchannel list %p (%s) in favor of %p (%s)", p, p->subchannel_list_.get(), old_counters_string.c_str(), this, CountersString().c_str()); } p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } // Only set connectivity state if this is the current subchannel list. if (p->subchannel_list_.get() != this) return; // First matching rule wins: // 1) ANY subchannel is READY => policy is READY. // 2) ANY subchannel is CONNECTING => policy is CONNECTING. // 3) ALL subchannels are TRANSIENT_FAILURE => policy is TRANSIENT_FAILURE. if (num_ready_ > 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] reporting READY with subchannel list %p", p, this); } p->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, absl::Status(), MakeRefCounted(p->Ref(), this)); } else if (num_connecting_ > 0) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] reporting CONNECTING with subchannel list %p", p, this); } p->channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, absl::Status(), MakeRefCounted(p->Ref(DEBUG_LOCATION, "QueuePicker"))); } else if (num_transient_failure_ == num_subchannels()) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log( GPR_INFO, "[WRR %p] reporting TRANSIENT_FAILURE with subchannel list %p: %s", p, this, status_for_tf.ToString().c_str()); } if (!status_for_tf.ok()) { last_failure_ = absl::UnavailableError( absl::StrCat("connections to all backends failing; last error: ", status_for_tf.ToString())); } p->channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, last_failure_, MakeRefCounted(last_failure_)); } } // // WeightedRoundRobin::WeightedRoundRobinSubchannelData::OobWatcher // void WeightedRoundRobin::WeightedRoundRobinSubchannelData::OobWatcher:: OnBackendMetricReport(const BackendMetricData& backend_metric_data) { double utilization = backend_metric_data.application_utilization; if (utilization <= 0) { utilization = backend_metric_data.cpu_utilization; } weight_->MaybeUpdateWeight(backend_metric_data.qps, backend_metric_data.eps, utilization, error_utilization_penalty_); } // // WeightedRoundRobin::WeightedRoundRobinSubchannelData // WeightedRoundRobin::WeightedRoundRobinSubchannelData:: WeightedRoundRobinSubchannelData( SubchannelList* subchannel_list, const ServerAddress& address, RefCountedPtr sc) : SubchannelData(subchannel_list, address, std::move(sc)), weight_(static_cast(subchannel_list->policy()) ->GetOrCreateWeight(address.address())) { // Start OOB watch if configured. WeightedRoundRobin* p = static_cast(subchannel_list->policy()); if (p->config_->enable_oob_load_report()) { subchannel()->AddDataWatcher(MakeOobBackendMetricWatcher( p->config_->oob_reporting_period(), std::make_unique(weight_, p->config_->error_utilization_penalty()))); } } void WeightedRoundRobin::WeightedRoundRobinSubchannelData:: ProcessConnectivityChangeLocked( absl::optional old_state, grpc_connectivity_state new_state) { WeightedRoundRobin* p = static_cast(subchannel_list()->policy()); GPR_ASSERT(subchannel() != nullptr); // If this is not the initial state notification and the new state is // TRANSIENT_FAILURE or IDLE, re-resolve. // Note that we don't want to do this on the initial state notification, // because that would result in an endless loop of re-resolution. if (old_state.has_value() && (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE || new_state == GRPC_CHANNEL_IDLE)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] Subchannel %p reported %s; requesting re-resolution", p, subchannel(), ConnectivityStateName(new_state)); } p->channel_control_helper()->RequestReresolution(); } if (new_state == GRPC_CHANNEL_IDLE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] Subchannel %p reported IDLE; requesting connection", p, subchannel()); } subchannel()->RequestConnection(); } else if (new_state == GRPC_CHANNEL_READY) { // If we transition back to READY state, restart the blackout period. // Note that we cannot guarantee that we will never receive // lingering callbacks for backend metric reports from the previous // connection after the new connection has been established, but they // should be masked by new backend metric reports from the new // connection by the time the blackout period ends. weight_->ResetNonEmptySince(); } // Update logical connectivity state. UpdateLogicalConnectivityStateLocked(new_state); // Update the policy state. subchannel_list()->MaybeUpdateAggregatedConnectivityStateLocked( connectivity_status()); } void WeightedRoundRobin::WeightedRoundRobinSubchannelData:: UpdateLogicalConnectivityStateLocked( grpc_connectivity_state connectivity_state) { WeightedRoundRobin* p = static_cast(subchannel_list()->policy()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log( GPR_INFO, "[WRR %p] connectivity changed for subchannel %p, subchannel_list %p " "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), (logical_connectivity_state_.has_value() ? ConnectivityStateName(*logical_connectivity_state_) : "N/A"), ConnectivityStateName(connectivity_state)); } // Decide what state to report for aggregation purposes. // If the last logical state was TRANSIENT_FAILURE, then ignore the // state change unless the new state is READY. if (logical_connectivity_state_.has_value() && *logical_connectivity_state_ == GRPC_CHANNEL_TRANSIENT_FAILURE && connectivity_state != GRPC_CHANNEL_READY) { return; } // If the new state is IDLE, treat it as CONNECTING, since it will // immediately transition into CONNECTING anyway. if (connectivity_state == GRPC_CHANNEL_IDLE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) { gpr_log(GPR_INFO, "[WRR %p] subchannel %p, subchannel_list %p (index %" PRIuPTR " of %" PRIuPTR "): treating IDLE as CONNECTING", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels()); } connectivity_state = GRPC_CHANNEL_CONNECTING; } // If no change, return false. if (logical_connectivity_state_.has_value() && *logical_connectivity_state_ == connectivity_state) { return; } // Otherwise, update counters and logical state. subchannel_list()->UpdateStateCountersLocked(logical_connectivity_state_, connectivity_state); logical_connectivity_state_ = connectivity_state; } // // factory // class WeightedRoundRobinFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); } absl::string_view name() const override { return kWeightedRoundRobin; } absl::StatusOr> ParseLoadBalancingConfig(const Json& json) const override { return LoadFromJson>( json, JsonArgs(), "errors validating weighted_round_robin LB policy config"); } }; } // namespace void RegisterWeightedRoundRobinLbPolicy(CoreConfiguration::Builder* builder) { builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( std::make_unique()); } } // namespace grpc_core