/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /** Round Robin Policy. * * Before every pick, the \a get_next_ready_subchannel_index_locked function * returns the p->subchannel_list->subchannels index for next subchannel, * respecting the relative order of the addresses provided upon creation or * updates. Note however that updates will start picking from the beginning of * the updated list. */ #include #include #include #include #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" namespace grpc_core { TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); namespace { // // round_robin LB policy // constexpr char kRoundRobin[] = "round_robin"; class RoundRobin : public LoadBalancingPolicy { public: explicit RoundRobin(Args args); const char* name() const override { return kRoundRobin; } void UpdateLocked(UpdateArgs args) override; void ResetBackoffLocked() override; private: ~RoundRobin(); // Forward declaration. class RoundRobinSubchannelList; // Data for a particular subchannel in a subchannel list. // This subclass adds the following functionality: // - Tracks the previous connectivity state of the subchannel, so that // we know how many subchannels are in each state. class RoundRobinSubchannelData : public SubchannelData { public: RoundRobinSubchannelData( SubchannelList* subchannel_list, const ServerAddress& address, RefCountedPtr subchannel) : SubchannelData(subchannel_list, address, std::move(subchannel)) {} grpc_connectivity_state connectivity_state() const { return last_connectivity_state_; } void UpdateConnectivityStateLocked( grpc_connectivity_state connectivity_state); private: void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) override; grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE; }; // A list of subchannels. class RoundRobinSubchannelList : public SubchannelList { public: RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer, const ServerAddressList& addresses, grpc_combiner* combiner, const grpc_channel_args& args) : SubchannelList(policy, tracer, addresses, combiner, policy->channel_control_helper(), args) { // Need to maintain a ref to the LB policy as long as we maintain // any references to subchannels, since the subchannels' // pollset_sets will include the LB policy's pollset_set. policy->Ref(DEBUG_LOCATION, "subchannel_list").release(); } ~RoundRobinSubchannelList() { RoundRobin* p = static_cast(policy()); p->Unref(DEBUG_LOCATION, "subchannel_list"); } // Starts watching the subchannels in this list. void StartWatchingLocked(); // Updates the counters of subchannels in each state when a // subchannel transitions from old_state to new_state. void UpdateStateCountersLocked(grpc_connectivity_state old_state, grpc_connectivity_state new_state); // If this subchannel list is the RR policy's current subchannel // list, updates the RR policy's connectivity state based on the // subchannel list's state counters. void MaybeUpdateRoundRobinConnectivityStateLocked(); // Updates the RR policy's overall state based on the counters of // subchannels in each state. void UpdateRoundRobinStateFromSubchannelStateCountsLocked(); private: size_t num_ready_ = 0; size_t num_connecting_ = 0; size_t num_transient_failure_ = 0; }; class Picker : public SubchannelPicker { public: Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list); PickResult Pick(PickArgs args) override; private: // Using pointer value only, no ref held -- do not dereference! RoundRobin* parent_; size_t last_picked_index_; InlinedVector, 10> subchannels_; }; void ShutdownLocked() override; /** list of subchannels */ OrphanablePtr subchannel_list_; /** Latest version of the subchannel list. * Subchannel connectivity callbacks will only promote updated subchannel * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ OrphanablePtr latest_pending_subchannel_list_; /** are we shutting down? */ bool shutdown_ = false; }; // // RoundRobin::Picker // RoundRobin::Picker::Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list) : parent_(parent) { for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) { auto* connected_subchannel = subchannel_list->subchannel(i)->connected_subchannel(); if (connected_subchannel != nullptr) { subchannels_.push_back(connected_subchannel->Ref()); } } // For discussion on why we generate a random starting index for // the picker, see https://github.com/grpc/grpc-go/issues/2580. // TODO(roth): rand(3) is not thread-safe. This should be replaced with // something better as part of https://github.com/grpc/grpc/issues/17891. last_picked_index_ = rand() % subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p picker %p] created picker from subchannel_list=%p " "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR, parent_, this, subchannel_list, subchannels_.size(), last_picked_index_); } } RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs args) { last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size(); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p picker %p] returning index %" PRIuPTR ", connected_subchannel=%p", parent_, this, last_picked_index_, subchannels_[last_picked_index_].get()); } PickResult result; result.type = PickResult::PICK_COMPLETE; result.connected_subchannel = subchannels_[last_picked_index_]; return result; } // // RoundRobin // RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Created", this); } } RoundRobin::~RoundRobin() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); } void RoundRobin::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Shutting down", this); } shutdown_ = true; subchannel_list_.reset(); latest_pending_subchannel_list_.reset(); } void RoundRobin::ResetBackoffLocked() { subchannel_list_->ResetBackoffLocked(); if (latest_pending_subchannel_list_ != nullptr) { latest_pending_subchannel_list_->ResetBackoffLocked(); } } void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() { if (num_subchannels() == 0) return; // Check current state of each subchannel synchronously, since any // subchannel already used by some other channel may have a non-IDLE // state. for (size_t i = 0; i < num_subchannels(); ++i) { grpc_connectivity_state state = subchannel(i)->CheckConnectivityStateLocked(); if (state != GRPC_CHANNEL_IDLE) { subchannel(i)->UpdateConnectivityStateLocked(state); } } // Start connectivity watch for each subchannel. for (size_t i = 0; i < num_subchannels(); i++) { if (subchannel(i)->subchannel() != nullptr) { subchannel(i)->StartConnectivityWatchLocked(); subchannel(i)->subchannel()->AttemptToConnect(); } } // Now set the LB policy's state based on the subchannels' states. UpdateRoundRobinStateFromSubchannelStateCountsLocked(); } void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked( grpc_connectivity_state old_state, grpc_connectivity_state new_state) { GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN); if (old_state == GRPC_CHANNEL_READY) { GPR_ASSERT(num_ready_ > 0); --num_ready_; } else if (old_state == GRPC_CHANNEL_CONNECTING) { GPR_ASSERT(num_connecting_ > 0); --num_connecting_; } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(num_transient_failure_ > 0); --num_transient_failure_; } if (new_state == GRPC_CHANNEL_READY) { ++num_ready_; } else if (new_state == GRPC_CHANNEL_CONNECTING) { ++num_connecting_; } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++num_transient_failure_; } } // Sets the RR policy's connectivity state and generates a new picker based // on the current subchannel list. void RoundRobin::RoundRobinSubchannelList:: MaybeUpdateRoundRobinConnectivityStateLocked() { RoundRobin* p = static_cast(policy()); // Only set connectivity state if this is the current subchannel list. if (p->subchannel_list_.get() != this) return; /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * * 1) RULE: ANY subchannel is READY => policy is READY. * CHECK: subchannel_list->num_ready > 0. * * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ if (num_ready_ > 0) { /* 1) READY */ p->channel_control_helper()->UpdateState( GRPC_CHANNEL_READY, UniquePtr(New(p, this))); } else if (num_connecting_ > 0) { /* 2) CONNECTING */ p->channel_control_helper()->UpdateState( GRPC_CHANNEL_CONNECTING, UniquePtr(New( p->Ref(DEBUG_LOCATION, "QueuePicker")))); } else if (num_transient_failure_ == num_subchannels()) { /* 3) TRANSIENT_FAILURE */ grpc_error* error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING( "connections to all backends failing"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); p->channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, UniquePtr(New(error))); } } void RoundRobin::RoundRobinSubchannelList:: UpdateRoundRobinStateFromSubchannelStateCountsLocked() { RoundRobin* p = static_cast(policy()); if (num_ready_ > 0) { if (p->subchannel_list_.get() != this) { // Promote this list to p->subchannel_list_. // This list must be p->latest_pending_subchannel_list_, because // any previous update would have been shut down already and // therefore we would not be receiving a notification for them. GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this); GPR_ASSERT(!shutting_down()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { const size_t old_num_subchannels = p->subchannel_list_ != nullptr ? p->subchannel_list_->num_subchannels() : 0; gpr_log(GPR_INFO, "[RR %p] phasing out subchannel list %p (size %" PRIuPTR ") in favor of %p (size %" PRIuPTR ")", p, p->subchannel_list_.get(), old_num_subchannels, this, num_subchannels()); } p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_); } } // Update the RR policy's connectivity state if needed. MaybeUpdateRoundRobinConnectivityStateLocked(); } void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked( grpc_connectivity_state connectivity_state) { RoundRobin* p = static_cast(subchannel_list()->policy()); if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log( GPR_INFO, "[RR %p] connectivity changed for subchannel %p, subchannel_list %p " "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s", p, subchannel(), subchannel_list(), Index(), subchannel_list()->num_subchannels(), grpc_connectivity_state_name(last_connectivity_state_), grpc_connectivity_state_name(connectivity_state)); } subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_, connectivity_state); last_connectivity_state_ = connectivity_state; } void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) { RoundRobin* p = static_cast(subchannel_list()->policy()); GPR_ASSERT(subchannel() != nullptr); // If the new state is TRANSIENT_FAILURE, re-resolve. // Only do this if we've started watching, not at startup time. // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE // when the subchannel list was created, we'd wind up in a constant // loop of re-resolution. // Also attempt to reconnect. if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "Requesting re-resolution", p, subchannel()); } p->channel_control_helper()->RequestReresolution(); subchannel()->AttemptToConnect(); } // Update state counters. UpdateConnectivityStateLocked(connectivity_state); // Update overall state and renew notification. subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked(); } void RoundRobin::UpdateLocked(UpdateArgs args) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses", this, args.addresses.size()); } // Replace latest_pending_subchannel_list_. if (latest_pending_subchannel_list_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) { gpr_log(GPR_INFO, "[RR %p] Shutting down previous pending subchannel list %p", this, latest_pending_subchannel_list_.get()); } } latest_pending_subchannel_list_ = MakeOrphanable( this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args); if (latest_pending_subchannel_list_->num_subchannels() == 0) { // If the new list is empty, immediately promote the new list to the // current list and transition to TRANSIENT_FAILURE. grpc_error* error = grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE); channel_control_helper()->UpdateState( GRPC_CHANNEL_TRANSIENT_FAILURE, UniquePtr(New(error))); subchannel_list_ = std::move(latest_pending_subchannel_list_); } else if (subchannel_list_ == nullptr) { // If there is no current list, immediately promote the new list to // the current list and start watching it. subchannel_list_ = std::move(latest_pending_subchannel_list_); subchannel_list_->StartWatchingLocked(); } else { // Start watching the pending list. It will get swapped into the // current list when it reports READY. latest_pending_subchannel_list_->StartWatchingLocked(); } } class ParsedRoundRobinConfig : public LoadBalancingPolicy::Config { public: const char* name() const override { return kRoundRobin; } }; // // factory // class RoundRobinFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr CreateLoadBalancingPolicy( LoadBalancingPolicy::Args args) const override { return OrphanablePtr(New(std::move(args))); } const char* name() const override { return kRoundRobin; } RefCountedPtr ParseLoadBalancingConfig( const grpc_json* json, grpc_error** error) const override { if (json != nullptr) { GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0); } return RefCountedPtr( New()); } }; } // namespace } // namespace grpc_core void grpc_lb_policy_round_robin_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( grpc_core::UniquePtr( grpc_core::New())); } void grpc_lb_policy_round_robin_shutdown() {}