// // Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include #include "src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.h" #include #include #include #include #include #include #include #include #include #include #include #include #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/string_view.h" #include "absl/types/variant.h" #include #include #include #include "src/core/ext/filters/client_channel/lb_policy/child_policy_handler.h" #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/config/core_configuration.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/debug_location.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/validation_errors.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/json/json.h" #include "src/core/lib/load_balancing/lb_policy.h" #include "src/core/lib/load_balancing/lb_policy_factory.h" #include "src/core/lib/load_balancing/lb_policy_registry.h" #include "src/core/lib/load_balancing/subchannel_interface.h" #include "src/core/lib/resolver/server_address.h" #include "src/core/lib/transport/connectivity_state.h" namespace grpc_core { TraceFlag grpc_outlier_detection_lb_trace(false, "outlier_detection_lb"); namespace { using ::grpc_event_engine::experimental::EventEngine; constexpr absl::string_view kOutlierDetection = "outlier_detection_experimental"; // Config for xDS Cluster Impl LB policy. class OutlierDetectionLbConfig : public LoadBalancingPolicy::Config { public: OutlierDetectionLbConfig( OutlierDetectionConfig outlier_detection_config, RefCountedPtr child_policy) : outlier_detection_config_(outlier_detection_config), child_policy_(std::move(child_policy)) {} absl::string_view name() const override { return kOutlierDetection; } bool CountingEnabled() const { return outlier_detection_config_.success_rate_ejection.has_value() || outlier_detection_config_.failure_percentage_ejection.has_value(); } const OutlierDetectionConfig& outlier_detection_config() const { return outlier_detection_config_; } RefCountedPtr child_policy() const { return child_policy_; } private: OutlierDetectionConfig outlier_detection_config_; RefCountedPtr child_policy_; }; // xDS Cluster Impl LB policy. class OutlierDetectionLb : public LoadBalancingPolicy { public: explicit OutlierDetectionLb(Args args); absl::string_view name() const override { return kOutlierDetection; } absl::Status UpdateLocked(UpdateArgs args) override; void ExitIdleLocked() override; void ResetBackoffLocked() override; private: class SubchannelState; class SubchannelWrapper : public DelegatingSubchannel { public: SubchannelWrapper(RefCountedPtr subchannel_state, RefCountedPtr subchannel) : DelegatingSubchannel(std::move(subchannel)), subchannel_state_(std::move(subchannel_state)) { if (subchannel_state_ != nullptr) { subchannel_state_->AddSubchannel(this); if (subchannel_state_->ejection_time().has_value()) { ejected_ = true; } } } ~SubchannelWrapper() override { if (subchannel_state_ != nullptr) { subchannel_state_->RemoveSubchannel(this); } } void Eject(); void Uneject(); void WatchConnectivityState( std::unique_ptr watcher) override; void CancelConnectivityStateWatch( ConnectivityStateWatcherInterface* watcher) override; RefCountedPtr subchannel_state() const { return subchannel_state_; } private: class WatcherWrapper : public SubchannelInterface::ConnectivityStateWatcherInterface { public: WatcherWrapper(std::unique_ptr< SubchannelInterface::ConnectivityStateWatcherInterface> watcher, bool ejected) : watcher_(std::move(watcher)), ejected_(ejected) {} void Eject() { ejected_ = true; if (last_seen_state_.has_value()) { watcher_->OnConnectivityStateChange( GRPC_CHANNEL_TRANSIENT_FAILURE, absl::UnavailableError( "subchannel ejected by outlier detection")); } } void Uneject() { ejected_ = false; if (last_seen_state_.has_value()) { watcher_->OnConnectivityStateChange(*last_seen_state_, last_seen_status_); } } void OnConnectivityStateChange(grpc_connectivity_state new_state, absl::Status status) override { const bool send_update = !last_seen_state_.has_value() || !ejected_; last_seen_state_ = new_state; last_seen_status_ = status; if (send_update) { if (ejected_) { new_state = GRPC_CHANNEL_TRANSIENT_FAILURE; status = absl::UnavailableError( "subchannel ejected by outlier detection"); } watcher_->OnConnectivityStateChange(new_state, status); } } grpc_pollset_set* interested_parties() override { return watcher_->interested_parties(); } private: std::unique_ptr watcher_; absl::optional last_seen_state_; absl::Status last_seen_status_; bool ejected_; }; RefCountedPtr subchannel_state_; bool ejected_ = false; std::map watchers_; }; class SubchannelState : public RefCounted { public: struct Bucket { std::atomic successes; std::atomic failures; }; void RotateBucket() { backup_bucket_->successes = 0; backup_bucket_->failures = 0; current_bucket_.swap(backup_bucket_); active_bucket_.store(current_bucket_.get()); } absl::optional> GetSuccessRateAndVolume() { uint64_t total_request = backup_bucket_->successes + backup_bucket_->failures; if (total_request == 0) { return absl::nullopt; } double success_rate = backup_bucket_->successes * 100.0 / (backup_bucket_->successes + backup_bucket_->failures); return { {success_rate, backup_bucket_->successes + backup_bucket_->failures}}; } void AddSubchannel(SubchannelWrapper* wrapper) { subchannels_.insert(wrapper); } void RemoveSubchannel(SubchannelWrapper* wrapper) { subchannels_.erase(wrapper); } void AddSuccessCount() { active_bucket_.load()->successes.fetch_add(1); } void AddFailureCount() { active_bucket_.load()->failures.fetch_add(1); } absl::optional ejection_time() const { return ejection_time_; } void Eject(const Timestamp& time) { ejection_time_ = time; ++multiplier_; for (auto& subchannel : subchannels_) { subchannel->Eject(); } } void Uneject() { ejection_time_.reset(); for (auto& subchannel : subchannels_) { subchannel->Uneject(); } } bool MaybeUneject(uint64_t base_ejection_time_in_millis, uint64_t max_ejection_time_in_millis) { if (!ejection_time_.has_value()) { if (multiplier_ > 0) { --multiplier_; } } else { GPR_ASSERT(ejection_time_.has_value()); auto change_time = ejection_time_.value() + Duration::Milliseconds(std::min( base_ejection_time_in_millis * multiplier_, std::max(base_ejection_time_in_millis, max_ejection_time_in_millis))); if (change_time < Timestamp::Now()) { Uneject(); return true; } } return false; } void DisableEjection() { Uneject(); multiplier_ = 0; } private: std::unique_ptr current_bucket_ = std::make_unique(); std::unique_ptr backup_bucket_ = std::make_unique(); // The bucket used to update call counts. // Points to either current_bucket or active_bucket. std::atomic active_bucket_{current_bucket_.get()}; uint32_t multiplier_ = 0; absl::optional ejection_time_; std::set subchannels_; }; // A picker that wraps the picker from the child to perform outlier detection. class Picker : public SubchannelPicker { public: Picker(OutlierDetectionLb* outlier_detection_lb, RefCountedPtr picker, bool counting_enabled); PickResult Pick(PickArgs args) override; private: class SubchannelCallTracker; RefCountedPtr picker_; bool counting_enabled_; }; class Helper : public ChannelControlHelper { public: explicit Helper(RefCountedPtr outlier_detection_policy) : outlier_detection_policy_(std::move(outlier_detection_policy)) {} ~Helper() override { outlier_detection_policy_.reset(DEBUG_LOCATION, "Helper"); } RefCountedPtr CreateSubchannel( ServerAddress address, const ChannelArgs& args) override; void UpdateState(grpc_connectivity_state state, const absl::Status& status, RefCountedPtr picker) override; void RequestReresolution() override; absl::string_view GetAuthority() override; grpc_event_engine::experimental::EventEngine* GetEventEngine() override; void AddTraceEvent(TraceSeverity severity, absl::string_view message) override; private: RefCountedPtr outlier_detection_policy_; }; class EjectionTimer : public InternallyRefCounted { public: EjectionTimer(RefCountedPtr parent, Timestamp start_time); void Orphan() override; Timestamp StartTime() const { return start_time_; } private: void OnTimerLocked(); RefCountedPtr parent_; absl::optional timer_handle_; Timestamp start_time_; absl::BitGen bit_gen_; }; ~OutlierDetectionLb() override; static std::string MakeKeyForAddress(const ServerAddress& address); void ShutdownLocked() override; OrphanablePtr CreateChildPolicyLocked( const ChannelArgs& args); void MaybeUpdatePickerLocked(); // Current config from the resolver. RefCountedPtr config_; // Internal state. bool shutting_down_ = false; OrphanablePtr child_policy_; // Latest state and picker reported by the child policy. grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE; absl::Status status_; RefCountedPtr picker_; std::map> subchannel_state_map_; OrphanablePtr ejection_timer_; }; // // OutlierDetectionLb::SubchannelWrapper // void OutlierDetectionLb::SubchannelWrapper::Eject() { ejected_ = true; for (auto& watcher : watchers_) { watcher.second->Eject(); } } void OutlierDetectionLb::SubchannelWrapper::Uneject() { ejected_ = false; for (auto& watcher : watchers_) { watcher.second->Uneject(); } } void OutlierDetectionLb::SubchannelWrapper::WatchConnectivityState( std::unique_ptr watcher) { ConnectivityStateWatcherInterface* watcher_ptr = watcher.get(); auto watcher_wrapper = std::make_unique(std::move(watcher), ejected_); watchers_.emplace(watcher_ptr, watcher_wrapper.get()); wrapped_subchannel()->WatchConnectivityState(std::move(watcher_wrapper)); } void OutlierDetectionLb::SubchannelWrapper::CancelConnectivityStateWatch( ConnectivityStateWatcherInterface* watcher) { auto it = watchers_.find(watcher); if (it == watchers_.end()) return; wrapped_subchannel()->CancelConnectivityStateWatch(it->second); watchers_.erase(it); } // // OutlierDetectionLb::Picker::SubchannelCallTracker // class OutlierDetectionLb::Picker::SubchannelCallTracker : public LoadBalancingPolicy::SubchannelCallTrackerInterface { public: SubchannelCallTracker( std::unique_ptr original_subchannel_call_tracker, RefCountedPtr subchannel_state) : original_subchannel_call_tracker_( std::move(original_subchannel_call_tracker)), subchannel_state_(std::move(subchannel_state)) {} ~SubchannelCallTracker() override { subchannel_state_.reset(DEBUG_LOCATION, "SubchannelCallTracker"); } void Start() override { // This tracker does not care about started calls only finished calls. // Delegate if needed. if (original_subchannel_call_tracker_ != nullptr) { original_subchannel_call_tracker_->Start(); } } void Finish(FinishArgs args) override { // Delegate if needed. if (original_subchannel_call_tracker_ != nullptr) { original_subchannel_call_tracker_->Finish(args); } // Record call completion based on status for outlier detection // calculations. if (subchannel_state_ != nullptr) { if (args.status.ok()) { subchannel_state_->AddSuccessCount(); } else { subchannel_state_->AddFailureCount(); } } } private: std::unique_ptr original_subchannel_call_tracker_; RefCountedPtr subchannel_state_; }; // // OutlierDetectionLb::Picker // OutlierDetectionLb::Picker::Picker(OutlierDetectionLb* outlier_detection_lb, RefCountedPtr picker, bool counting_enabled) : picker_(std::move(picker)), counting_enabled_(counting_enabled) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] constructed new picker %p and counting " "is %s", outlier_detection_lb, this, (counting_enabled ? "enabled" : "disabled")); } } LoadBalancingPolicy::PickResult OutlierDetectionLb::Picker::Pick( LoadBalancingPolicy::PickArgs args) { if (picker_ == nullptr) { // Should never happen. return PickResult::Fail(absl::InternalError( "outlier_detection picker not given any child picker")); } // Delegate to child picker PickResult result = picker_->Pick(args); auto* complete_pick = absl::get_if(&result.result); if (complete_pick != nullptr) { // Unwrap subchannel to pass back up the stack. auto* subchannel_wrapper = static_cast(complete_pick->subchannel.get()); // Inject subchannel call tracker to record call completion as long as // not both success_rate_ejection and failure_percentage_ejection are unset. if (counting_enabled_) { complete_pick->subchannel_call_tracker = std::make_unique( std::move(complete_pick->subchannel_call_tracker), subchannel_wrapper->subchannel_state()); } complete_pick->subchannel = subchannel_wrapper->wrapped_subchannel(); } return result; } // // OutlierDetectionLb // OutlierDetectionLb::OutlierDetectionLb(Args args) : LoadBalancingPolicy(std::move(args)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] created", this); } } OutlierDetectionLb::~OutlierDetectionLb() { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] destroying outlier_detection LB policy", this); } } std::string OutlierDetectionLb::MakeKeyForAddress( const ServerAddress& address) { // Use only the address, not the attributes. auto addr_str = grpc_sockaddr_to_string(&address.address(), false); return addr_str.ok() ? addr_str.value() : addr_str.status().ToString(); } void OutlierDetectionLb::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] shutting down", this); } ejection_timer_.reset(); shutting_down_ = true; // Remove the child policy's interested_parties pollset_set from the // xDS policy. if (child_policy_ != nullptr) { grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(), interested_parties()); child_policy_.reset(); } // Drop our ref to the child's picker, in case it's holding a ref to // the child. picker_.reset(); } void OutlierDetectionLb::ExitIdleLocked() { if (child_policy_ != nullptr) child_policy_->ExitIdleLocked(); } void OutlierDetectionLb::ResetBackoffLocked() { if (child_policy_ != nullptr) child_policy_->ResetBackoffLocked(); } absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] Received update", this); } auto old_config = std::move(config_); // Update config. config_ = std::move(args.config); // Update outlier detection timer. if (!config_->CountingEnabled()) { // No need for timer. Cancel the current timer, if any. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] counting disabled, cancelling timer", this); } ejection_timer_.reset(); } else if (ejection_timer_ == nullptr) { // No timer running. Start it now. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] starting timer", this); } ejection_timer_ = MakeOrphanable(Ref(), Timestamp::Now()); for (const auto& p : subchannel_state_map_) { p.second->RotateBucket(); // Reset call counters. } } else if (old_config->outlier_detection_config().interval != config_->outlier_detection_config().interval) { // Timer interval changed. Cancel the current timer and start a new one // with the same start time. // Note that if the new deadline is in the past, the timer will fire // immediately. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] interval changed, replacing timer", this); } ejection_timer_ = MakeOrphanable(Ref(), ejection_timer_->StartTime()); } // Update subchannel state map. if (args.addresses.ok()) { std::set current_addresses; for (const ServerAddress& address : *args.addresses) { std::string address_key = MakeKeyForAddress(address); auto& subchannel_state = subchannel_state_map_[address_key]; if (subchannel_state == nullptr) { subchannel_state = MakeRefCounted(); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] adding map entry for %s (%p)", this, address_key.c_str(), subchannel_state.get()); } } else if (!config_->CountingEnabled()) { // If counting is not enabled, reset state. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] counting disabled; disabling " "ejection for %s (%p)", this, address_key.c_str(), subchannel_state.get()); } subchannel_state->DisableEjection(); } current_addresses.emplace(address_key); } for (auto it = subchannel_state_map_.begin(); it != subchannel_state_map_.end();) { if (current_addresses.find(it->first) == current_addresses.end()) { // remove each map entry for a subchannel address not in the updated // address list. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] removing map entry for %s (%p)", this, it->first.c_str(), it->second.get()); } it = subchannel_state_map_.erase(it); } else { ++it; } } } // Create child policy if needed. if (child_policy_ == nullptr) { child_policy_ = CreateChildPolicyLocked(args.args); } // Update child policy. UpdateArgs update_args; update_args.addresses = std::move(args.addresses); update_args.resolution_note = std::move(args.resolution_note); update_args.config = config_->child_policy(); // Update the policy. update_args.args = std::move(args.args); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] Updating child policy handler %p", this, child_policy_.get()); } return child_policy_->UpdateLocked(std::move(update_args)); } void OutlierDetectionLb::MaybeUpdatePickerLocked() { if (picker_ != nullptr) { auto outlier_detection_picker = MakeRefCounted(this, picker_, config_->CountingEnabled()); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] updating connectivity: state=%s " "status=(%s) picker=%p", this, ConnectivityStateName(state_), status_.ToString().c_str(), outlier_detection_picker.get()); } channel_control_helper()->UpdateState(state_, status_, std::move(outlier_detection_picker)); } } OrphanablePtr OutlierDetectionLb::CreateChildPolicyLocked( const ChannelArgs& args) { LoadBalancingPolicy::Args lb_policy_args; lb_policy_args.work_serializer = work_serializer(); lb_policy_args.args = args; lb_policy_args.channel_control_helper = std::make_unique(Ref(DEBUG_LOCATION, "Helper")); OrphanablePtr lb_policy = MakeOrphanable(std::move(lb_policy_args), &grpc_outlier_detection_lb_trace); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] Created new child policy handler %p", this, lb_policy.get()); } // Add our interested_parties pollset_set to that of the newly created // child policy. This will make the child policy progress upon activity on // this policy, which in turn is tied to the application's call. grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(), interested_parties()); return lb_policy; } // // OutlierDetectionLb::Helper // RefCountedPtr OutlierDetectionLb::Helper::CreateSubchannel( ServerAddress address, const ChannelArgs& args) { if (outlier_detection_policy_->shutting_down_) return nullptr; std::string key = MakeKeyForAddress(address); RefCountedPtr subchannel_state; auto it = outlier_detection_policy_->subchannel_state_map_.find(key); if (it != outlier_detection_policy_->subchannel_state_map_.end()) { subchannel_state = it->second->Ref(); } auto subchannel = MakeRefCounted( subchannel_state, outlier_detection_policy_->channel_control_helper()->CreateSubchannel( std::move(address), args)); if (subchannel_state != nullptr) { subchannel_state->AddSubchannel(subchannel.get()); } return subchannel; } void OutlierDetectionLb::Helper::UpdateState( grpc_connectivity_state state, const absl::Status& status, RefCountedPtr picker) { if (outlier_detection_policy_->shutting_down_) return; if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] child connectivity state update: " "state=%s (%s) picker=%p", outlier_detection_policy_.get(), ConnectivityStateName(state), status.ToString().c_str(), picker.get()); } // Save the state and picker. outlier_detection_policy_->state_ = state; outlier_detection_policy_->status_ = status; outlier_detection_policy_->picker_ = std::move(picker); // Wrap the picker and return it to the channel. outlier_detection_policy_->MaybeUpdatePickerLocked(); } void OutlierDetectionLb::Helper::RequestReresolution() { if (outlier_detection_policy_->shutting_down_) return; outlier_detection_policy_->channel_control_helper()->RequestReresolution(); } absl::string_view OutlierDetectionLb::Helper::GetAuthority() { return outlier_detection_policy_->channel_control_helper()->GetAuthority(); } grpc_event_engine::experimental::EventEngine* OutlierDetectionLb::Helper::GetEventEngine() { return outlier_detection_policy_->channel_control_helper()->GetEventEngine(); } void OutlierDetectionLb::Helper::AddTraceEvent(TraceSeverity severity, absl::string_view message) { if (outlier_detection_policy_->shutting_down_) return; outlier_detection_policy_->channel_control_helper()->AddTraceEvent(severity, message); } // // OutlierDetectionLb::EjectionTimer // OutlierDetectionLb::EjectionTimer::EjectionTimer( RefCountedPtr parent, Timestamp start_time) : parent_(std::move(parent)), start_time_(start_time) { auto interval = parent_->config_->outlier_detection_config().interval; if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer will run in %s", parent_.get(), interval.ToString().c_str()); } timer_handle_ = parent_->channel_control_helper()->GetEventEngine()->RunAfter( interval, [self = Ref(DEBUG_LOCATION, "EjectionTimer")]() mutable { ApplicationCallbackExecCtx callback_exec_ctx; ExecCtx exec_ctx; auto self_ptr = self.get(); self_ptr->parent_->work_serializer()->Run( [self = std::move(self)]() { self->OnTimerLocked(); }, DEBUG_LOCATION); }); } void OutlierDetectionLb::EjectionTimer::Orphan() { if (timer_handle_.has_value()) { parent_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_); timer_handle_.reset(); } Unref(); } void OutlierDetectionLb::EjectionTimer::OnTimerLocked() { if (!timer_handle_.has_value()) return; timer_handle_.reset(); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejection timer running", parent_.get()); } std::map success_rate_ejection_candidates; std::map failure_percentage_ejection_candidates; size_t ejected_host_count = 0; double success_rate_sum = 0; auto time_now = Timestamp::Now(); auto& config = parent_->config_->outlier_detection_config(); for (auto& state : parent_->subchannel_state_map_) { auto* subchannel_state = state.second.get(); // For each address, swap the call counter's buckets in that address's // map entry. subchannel_state->RotateBucket(); // Gather data to run success rate algorithm or failure percentage // algorithm. if (subchannel_state->ejection_time().has_value()) { ++ejected_host_count; } absl::optional> host_success_rate_and_volume = subchannel_state->GetSuccessRateAndVolume(); if (!host_success_rate_and_volume.has_value()) { continue; } double success_rate = host_success_rate_and_volume->first; uint64_t request_volume = host_success_rate_and_volume->second; if (config.success_rate_ejection.has_value()) { if (request_volume >= config.success_rate_ejection->request_volume) { success_rate_ejection_candidates[subchannel_state] = success_rate; success_rate_sum += success_rate; } } if (config.failure_percentage_ejection.has_value()) { if (request_volume >= config.failure_percentage_ejection->request_volume) { failure_percentage_ejection_candidates[subchannel_state] = success_rate; } } } if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] found %" PRIuPTR " success rate candidates and %" PRIuPTR " failure percentage candidates; ejected_host_count=%" PRIuPTR "; success_rate_sum=%.3f", parent_.get(), success_rate_ejection_candidates.size(), failure_percentage_ejection_candidates.size(), ejected_host_count, success_rate_sum); } // success rate algorithm if (!success_rate_ejection_candidates.empty() && success_rate_ejection_candidates.size() >= config.success_rate_ejection->minimum_hosts) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] running success rate algorithm", parent_.get()); } // calculate ejection threshold: (mean - stdev * // (success_rate_ejection.stdev_factor / 1000)) double mean = success_rate_sum / success_rate_ejection_candidates.size(); double variance = 0; for (const auto& p : success_rate_ejection_candidates) { variance += std::pow(p.second - mean, 2); } variance /= success_rate_ejection_candidates.size(); double stdev = std::sqrt(variance); const double success_rate_stdev_factor = static_cast(config.success_rate_ejection->stdev_factor) / 1000; double ejection_threshold = mean - stdev * success_rate_stdev_factor; if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] stdev=%.3f, ejection_threshold=%.3f", parent_.get(), stdev, ejection_threshold); } for (auto& candidate : success_rate_ejection_candidates) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] checking candidate %p: " "success_rate=%.3f", parent_.get(), candidate.first, candidate.second); } if (candidate.second < ejection_threshold) { uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); double current_percent = 100.0 * ejected_host_count / parent_->subchannel_state_map_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] random_key=%d " "ejected_host_count=%" PRIuPTR " current_percent=%.3f", parent_.get(), random_key, ejected_host_count, current_percent); } if (random_key < config.success_rate_ejection->enforcement_percentage && (ejected_host_count == 0 || (current_percent < config.max_ejection_percent))) { // Eject and record the timestamp for use when ejecting addresses in // this iteration. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", parent_.get()); } candidate.first->Eject(time_now); ++ejected_host_count; } } } } // failure percentage algorithm if (!failure_percentage_ejection_candidates.empty() && failure_percentage_ejection_candidates.size() >= config.failure_percentage_ejection->minimum_hosts) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] running failure percentage algorithm", parent_.get()); } for (auto& candidate : failure_percentage_ejection_candidates) { if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] checking candidate %p: " "success_rate=%.3f", parent_.get(), candidate.first, candidate.second); } // Extra check to make sure success rate algorithm didn't already // eject this backend. if (candidate.first->ejection_time().has_value()) continue; if ((100.0 - candidate.second) > config.failure_percentage_ejection->threshold) { uint32_t random_key = absl::Uniform(bit_gen_, 1, 100); double current_percent = 100.0 * ejected_host_count / parent_->subchannel_state_map_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] random_key=%d " "ejected_host_count=%" PRIuPTR " current_percent=%.3f", parent_.get(), random_key, ejected_host_count, current_percent); } if (random_key < config.failure_percentage_ejection->enforcement_percentage && (ejected_host_count == 0 || (current_percent < config.max_ejection_percent))) { // Eject and record the timestamp for use when ejecting addresses in // this iteration. if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] ejecting candidate", parent_.get()); } candidate.first->Eject(time_now); ++ejected_host_count; } } } } // For each address in the map: // If the address is not ejected and the multiplier is greater than 0, // decrease the multiplier by 1. If the address is ejected, and the // current time is after ejection_timestamp + min(base_ejection_time * // multiplier, max(base_ejection_time, max_ejection_time)), un-eject the // address. for (auto& state : parent_->subchannel_state_map_) { auto* subchannel_state = state.second.get(); const bool unejected = subchannel_state->MaybeUneject( config.base_ejection_time.millis(), config.max_ejection_time.millis()); if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) { gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)", parent_.get(), state.first.c_str(), subchannel_state); } } parent_->ejection_timer_ = MakeOrphanable(parent_, Timestamp::Now()); } // // factory // class OutlierDetectionLbFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return MakeOrphanable(std::move(args)); } absl::string_view name() const override { return kOutlierDetection; } absl::StatusOr> ParseLoadBalancingConfig(const Json& json) const override { if (json.type() == Json::Type::JSON_NULL) { // This policy was configured in the deprecated loadBalancingPolicy // field or in the client API. return absl::InvalidArgumentError( "field:loadBalancingPolicy error:outlier_detection policy requires " "configuration. Please use loadBalancingConfig field of service " "config instead."); } ValidationErrors errors; OutlierDetectionConfig outlier_detection_config; RefCountedPtr child_policy; { outlier_detection_config = LoadFromJson(json, JsonArgs(), &errors); // Parse childPolicy manually. { ValidationErrors::ScopedField field(&errors, ".childPolicy"); auto it = json.object_value().find("childPolicy"); if (it == json.object_value().end()) { errors.AddError("field not present"); } else { auto child_policy_config = CoreConfiguration::Get() .lb_policy_registry() .ParseLoadBalancingConfig(it->second); if (!child_policy_config.ok()) { errors.AddError(child_policy_config.status().message()); } else { child_policy = std::move(*child_policy_config); } } } } if (!errors.ok()) { return errors.status( "errors validating outlier_detection LB policy config"); } return MakeRefCounted(outlier_detection_config, std::move(child_policy)); } }; } // namespace // // OutlierDetectionConfig // const JsonLoaderInterface* OutlierDetectionConfig::SuccessRateEjection::JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .OptionalField("stdevFactor", &SuccessRateEjection::stdev_factor) .OptionalField("enforcementPercentage", &SuccessRateEjection::enforcement_percentage) .OptionalField("minimumHosts", &SuccessRateEjection::minimum_hosts) .OptionalField("requestVolume", &SuccessRateEjection::request_volume) .Finish(); return loader; } void OutlierDetectionConfig::SuccessRateEjection::JsonPostLoad( const Json&, const JsonArgs&, ValidationErrors* errors) { if (enforcement_percentage > 100) { ValidationErrors::ScopedField field(errors, ".enforcement_percentage"); errors->AddError("value must be <= 100"); } } const JsonLoaderInterface* OutlierDetectionConfig::FailurePercentageEjection::JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .OptionalField("threshold", &FailurePercentageEjection::threshold) .OptionalField("enforcementPercentage", &FailurePercentageEjection::enforcement_percentage) .OptionalField("minimumHosts", &FailurePercentageEjection::minimum_hosts) .OptionalField("requestVolume", &FailurePercentageEjection::request_volume) .Finish(); return loader; } void OutlierDetectionConfig::FailurePercentageEjection::JsonPostLoad( const Json&, const JsonArgs&, ValidationErrors* errors) { if (enforcement_percentage > 100) { ValidationErrors::ScopedField field(errors, ".enforcement_percentage"); errors->AddError("value must be <= 100"); } if (threshold > 100) { ValidationErrors::ScopedField field(errors, ".threshold"); errors->AddError("value must be <= 100"); } } const JsonLoaderInterface* OutlierDetectionConfig::JsonLoader(const JsonArgs&) { static const auto* loader = JsonObjectLoader() .OptionalField("interval", &OutlierDetectionConfig::interval) .OptionalField("baseEjectionTime", &OutlierDetectionConfig::base_ejection_time) .OptionalField("maxEjectionTime", &OutlierDetectionConfig::max_ejection_time) .OptionalField("maxEjectionPercent", &OutlierDetectionConfig::max_ejection_percent) .OptionalField("successRateEjection", &OutlierDetectionConfig::success_rate_ejection) .OptionalField("failurePercentageEjection", &OutlierDetectionConfig::failure_percentage_ejection) .Finish(); return loader; } void OutlierDetectionConfig::JsonPostLoad(const Json& json, const JsonArgs&, ValidationErrors* errors) { if (json.object_value().find("maxEjectionTime") == json.object_value().end()) { max_ejection_time = std::max(base_ejection_time, Duration::Seconds(300)); } if (max_ejection_percent > 100) { ValidationErrors::ScopedField field(errors, ".max_ejection_percent"); errors->AddError("value must be <= 100"); } } // // Plugin registration // void RegisterOutlierDetectionLbPolicy(CoreConfiguration::Builder* builder) { builder->lb_policy_registry()->RegisterLoadBalancingPolicyFactory( std::make_unique()); } } // namespace grpc_core