/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /** Round Robin Policy. * * Before every pick, the \a get_next_ready_subchannel_index_locked function * returns the p->subchannel_list->subchannels index for next subchannel, * respecting the relative order of the addresses provided upon creation or * updates. Note however that updates will start picking from the beginning of * the updated list. */ #include <grpc/support/port_platform.h> #include <string.h> #include <grpc/support/alloc.h> #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_index.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" #include "src/core/lib/transport/static_metadata.h" namespace grpc_core { TraceFlag grpc_lb_round_robin_trace(false, "round_robin"); namespace { // // round_robin LB policy // class RoundRobin : public LoadBalancingPolicy { public: explicit RoundRobin(const Args& args); void UpdateLocked(const grpc_channel_args& args) override; bool PickLocked(PickState* pick) override; void CancelPickLocked(PickState* pick, grpc_error* error) override; void CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error* error) override; void NotifyOnStateChangeLocked(grpc_connectivity_state* state, grpc_closure* closure) override; grpc_connectivity_state CheckConnectivityLocked( grpc_error** connectivity_error) override; void HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) override; void PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) override; void ExitIdleLocked() override; private: ~RoundRobin(); void ShutdownLocked() override; void StartPickingLocked(); size_t GetNextReadySubchannelIndexLocked(); void UpdateLastReadySubchannelIndexLocked(size_t last_ready_index); void UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, grpc_error* error); static void OnConnectivityChangedLocked(void* arg, grpc_error* error); void SubchannelListRefForConnectivityWatch( grpc_lb_subchannel_list* subchannel_list, const char* reason); void SubchannelListUnrefForConnectivityWatch( grpc_lb_subchannel_list* subchannel_list, const char* reason); /** list of subchannels */ grpc_lb_subchannel_list* subchannel_list_ = nullptr; /** have we started picking? */ bool started_picking_ = false; /** are we shutting down? */ bool shutdown_ = false; /** List of picks that are waiting on connectivity */ PickState* pending_picks_ = nullptr; /** our connectivity state tracker */ grpc_connectivity_state_tracker state_tracker_; /** Index into subchannels for last pick. */ size_t last_ready_subchannel_index_ = 0; /** Latest version of the subchannel list. * Subchannel connectivity callbacks will only promote updated subchannel * lists if they equal \a latest_pending_subchannel_list. In other words, * racing callbacks that reference outdated subchannel lists won't perform any * update. */ grpc_lb_subchannel_list* latest_pending_subchannel_list_ = nullptr; }; RoundRobin::RoundRobin(const Args& args) : LoadBalancingPolicy(args) { GPR_ASSERT(args.client_channel_factory != nullptr); grpc_connectivity_state_init(&state_tracker_, GRPC_CHANNEL_IDLE, "round_robin"); UpdateLocked(*args.args); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Created with %" PRIuPTR " subchannels", this, subchannel_list_->num_subchannels); } grpc_subchannel_index_ref(); } RoundRobin::~RoundRobin() { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Destroying Round Robin policy", this); } GPR_ASSERT(subchannel_list_ == nullptr); GPR_ASSERT(latest_pending_subchannel_list_ == nullptr); GPR_ASSERT(pending_picks_ == nullptr); grpc_connectivity_state_destroy(&state_tracker_); grpc_subchannel_index_unref(); } /** Returns the index into p->subchannel_list->subchannels of the next * subchannel in READY state, or p->subchannel_list->num_subchannels if no * subchannel is READY. * * Note that this function does *not* update p->last_ready_subchannel_index. * The caller must do that if it returns a pick. */ size_t RoundRobin::GetNextReadySubchannelIndexLocked() { GPR_ASSERT(subchannel_list_ != nullptr); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_INFO, "[RR %p] getting next ready subchannel (out of %" PRIuPTR "), " "last_ready_subchannel_index=%" PRIuPTR, this, subchannel_list_->num_subchannels, last_ready_subchannel_index_); } for (size_t i = 0; i < subchannel_list_->num_subchannels; ++i) { const size_t index = (i + last_ready_subchannel_index_ + 1) % subchannel_list_->num_subchannels; if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] checking subchannel %p, subchannel_list %p, index %" PRIuPTR ": state=%s", this, subchannel_list_->subchannels[index].subchannel, subchannel_list_, index, grpc_connectivity_state_name( subchannel_list_->subchannels[index].curr_connectivity_state)); } if (subchannel_list_->subchannels[index].curr_connectivity_state == GRPC_CHANNEL_READY) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] found next ready subchannel (%p) at index %" PRIuPTR " of subchannel_list %p", this, subchannel_list_->subchannels[index].subchannel, index, subchannel_list_); } return index; } } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] no subchannels in ready state", this); } return subchannel_list_->num_subchannels; } // Sets last_ready_subchannel_index_ to last_ready_index. void RoundRobin::UpdateLastReadySubchannelIndexLocked(size_t last_ready_index) { GPR_ASSERT(last_ready_index < subchannel_list_->num_subchannels); last_ready_subchannel_index_ = last_ready_index; if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] setting last_ready_subchannel_index=%" PRIuPTR " (SC %p, CSC %p)", this, last_ready_index, subchannel_list_->subchannels[last_ready_index].subchannel, subchannel_list_->subchannels[last_ready_index] .connected_subchannel.get()); } } void RoundRobin::HandOffPendingPicksLocked(LoadBalancingPolicy* new_policy) { PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; if (new_policy->PickLocked(pick)) { // Synchronous return, schedule closure. GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } } } void RoundRobin::ShutdownLocked() { grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Channel shutdown"); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down", this); } shutdown_ = true; PickState* pick; while ((pick = pending_picks_) != nullptr) { pending_picks_ = pick->next; pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_REF(error)); } grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_SHUTDOWN, GRPC_ERROR_REF(error), "rr_shutdown"); if (subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_rr_shutdown"); subchannel_list_ = nullptr; } if (latest_pending_subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( latest_pending_subchannel_list_, "sl_shutdown_pending_rr_shutdown"); latest_pending_subchannel_list_ = nullptr; } TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_CANCELLED); GRPC_ERROR_UNREF(error); } void RoundRobin::CancelPickLocked(PickState* pick, grpc_error* error) { PickState* pp = pending_picks_; pending_picks_ = nullptr; while (pp != nullptr) { PickState* next = pp->next; if (pp == pick) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { pp->next = pending_picks_; pending_picks_ = pp; } pp = next; } GRPC_ERROR_UNREF(error); } void RoundRobin::CancelMatchingPicksLocked(uint32_t initial_metadata_flags_mask, uint32_t initial_metadata_flags_eq, grpc_error* error) { PickState* pick = pending_picks_; pending_picks_ = nullptr; while (pick != nullptr) { PickState* next = pick->next; if ((pick->initial_metadata_flags & initial_metadata_flags_mask) == initial_metadata_flags_eq) { pick->connected_subchannel.reset(); GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING( "Pick Cancelled", &error, 1)); } else { pick->next = pending_picks_; pending_picks_ = pick; } pick = next; } GRPC_ERROR_UNREF(error); } void RoundRobin::SubchannelListRefForConnectivityWatch( grpc_lb_subchannel_list* subchannel_list, const char* reason) { // TODO(roth): We currently track this ref manually. Once the new // ClosureRef API is ready and the subchannel_list code has been // converted to a C++ API, find a way to hold the RefCountedPtr<> // somewhere (maybe in the subchannel_data object) instead of doing // this manually. auto self = Ref(DEBUG_LOCATION, reason); self.release(); grpc_lb_subchannel_list_ref(subchannel_list, reason); } void RoundRobin::SubchannelListUnrefForConnectivityWatch( grpc_lb_subchannel_list* subchannel_list, const char* reason) { Unref(DEBUG_LOCATION, reason); grpc_lb_subchannel_list_unref(subchannel_list, reason); } void RoundRobin::StartPickingLocked() { started_picking_ = true; for (size_t i = 0; i < subchannel_list_->num_subchannels; i++) { if (subchannel_list_->subchannels[i].subchannel != nullptr) { SubchannelListRefForConnectivityWatch(subchannel_list_, "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( &subchannel_list_->subchannels[i]); } } } void RoundRobin::ExitIdleLocked() { if (!started_picking_) { StartPickingLocked(); } } bool RoundRobin::PickLocked(PickState* pick) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Trying to pick (shutdown: %d)", this, shutdown_); } GPR_ASSERT(!shutdown_); if (subchannel_list_ != nullptr) { const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); if (next_ready_index < subchannel_list_->num_subchannels) { /* readily available, report right away */ grpc_lb_subchannel_data* sd = &subchannel_list_->subchannels[next_ready_index]; pick->connected_subchannel = sd->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = sd->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] Picked target <-- Subchannel %p (connected %p) (sl %p, " "index %" PRIuPTR ")", this, sd->subchannel, pick->connected_subchannel.get(), sd->subchannel_list, next_ready_index); } /* only advance the last picked pointer if the selection was used */ UpdateLastReadySubchannelIndexLocked(next_ready_index); return true; } } /* no pick currently available. Save for later in list of pending picks */ if (!started_picking_) { StartPickingLocked(); } pick->next = pending_picks_; pending_picks_ = pick; return false; } void UpdateStateCountersLocked(grpc_lb_subchannel_data* sd) { grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; GPR_ASSERT(sd->prev_connectivity_state != GRPC_CHANNEL_SHUTDOWN); GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_SHUTDOWN); if (sd->prev_connectivity_state == GRPC_CHANNEL_READY) { GPR_ASSERT(subchannel_list->num_ready > 0); --subchannel_list->num_ready; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { GPR_ASSERT(subchannel_list->num_transient_failures > 0); --subchannel_list->num_transient_failures; } else if (sd->prev_connectivity_state == GRPC_CHANNEL_IDLE) { GPR_ASSERT(subchannel_list->num_idle > 0); --subchannel_list->num_idle; } sd->prev_connectivity_state = sd->curr_connectivity_state; if (sd->curr_connectivity_state == GRPC_CHANNEL_READY) { ++subchannel_list->num_ready; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { ++subchannel_list->num_transient_failures; } else if (sd->curr_connectivity_state == GRPC_CHANNEL_IDLE) { ++subchannel_list->num_idle; } } /** Sets the policy's connectivity status based on that of the passed-in \a sd * (the grpc_lb_subchannel_data associated with the updated subchannel) and the * subchannel list \a sd belongs to (sd->subchannel_list). \a error will be used * only if the policy transitions to state TRANSIENT_FAILURE. */ void RoundRobin::UpdateConnectivityStatusLocked(grpc_lb_subchannel_data* sd, grpc_error* error) { /* In priority order. The first rule to match terminates the search (ie, if we * are on rule n, all previous rules were unfulfilled). * * 1) RULE: ANY subchannel is READY => policy is READY. * CHECK: subchannel_list->num_ready > 0. * * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING. * CHECK: sd->curr_connectivity_state == CONNECTING. * * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is * TRANSIENT_FAILURE. * CHECK: subchannel_list->num_transient_failures == * subchannel_list->num_subchannels. */ grpc_lb_subchannel_list* subchannel_list = sd->subchannel_list; GPR_ASSERT(sd->curr_connectivity_state != GRPC_CHANNEL_IDLE); if (subchannel_list->num_ready > 0) { /* 1) READY */ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_READY, GRPC_ERROR_NONE, "rr_ready"); } else if (sd->curr_connectivity_state == GRPC_CHANNEL_CONNECTING) { /* 2) CONNECTING */ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE, "rr_connecting"); } else if (subchannel_list->num_transient_failures == subchannel_list->num_subchannels) { /* 3) TRANSIENT_FAILURE */ grpc_connectivity_state_set(&state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error), "rr_exhausted_subchannels"); } GRPC_ERROR_UNREF(error); } void RoundRobin::OnConnectivityChangedLocked(void* arg, grpc_error* error) { grpc_lb_subchannel_data* sd = static_cast<grpc_lb_subchannel_data*>(arg); RoundRobin* p = static_cast<RoundRobin*>(sd->subchannel_list->policy); if (grpc_lb_round_robin_trace.enabled()) { gpr_log( GPR_DEBUG, "[RR %p] connectivity changed for subchannel %p, subchannel_list %p: " "prev_state=%s new_state=%s p->shutdown=%d " "sd->subchannel_list->shutting_down=%d error=%s", p, sd->subchannel, sd->subchannel_list, grpc_connectivity_state_name(sd->prev_connectivity_state), grpc_connectivity_state_name(sd->pending_connectivity_state_unsafe), p->shutdown_, sd->subchannel_list->shutting_down, grpc_error_string(error)); } GPR_ASSERT(sd->subchannel != nullptr); // If the policy is shutting down, unref and return. if (p->shutdown_) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_shutdown"); p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, "rr_shutdown"); return; } // If the subchannel list is shutting down, stop watching. if (sd->subchannel_list->shutting_down || error == GRPC_ERROR_CANCELLED) { grpc_lb_subchannel_data_stop_connectivity_watch(sd); grpc_lb_subchannel_data_unref_subchannel(sd, "rr_sl_shutdown"); p->SubchannelListUnrefForConnectivityWatch(sd->subchannel_list, "rr_sl_shutdown"); return; } // If we're still here, the notification must be for a subchannel in // either the current or latest pending subchannel lists. GPR_ASSERT(sd->subchannel_list == p->subchannel_list_ || sd->subchannel_list == p->latest_pending_subchannel_list_); GPR_ASSERT(sd->pending_connectivity_state_unsafe != GRPC_CHANNEL_SHUTDOWN); // Now that we're inside the combiner, copy the pending connectivity // state (which was set by the connectivity state watcher) to // curr_connectivity_state, which is what we use inside of the combiner. sd->curr_connectivity_state = sd->pending_connectivity_state_unsafe; // If the sd's new state is TRANSIENT_FAILURE, unref the *connected* // subchannel, if any. switch (sd->curr_connectivity_state) { case GRPC_CHANNEL_TRANSIENT_FAILURE: { sd->connected_subchannel.reset(); if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. " "Requesting re-resolution", p, sd->subchannel); } p->TryReresolutionLocked(&grpc_lb_round_robin_trace, GRPC_ERROR_NONE); break; } case GRPC_CHANNEL_READY: { if (sd->connected_subchannel == nullptr) { sd->connected_subchannel = grpc_subchannel_get_connected_subchannel(sd->subchannel); } if (sd->subchannel_list != p->subchannel_list_) { // promote sd->subchannel_list to p->subchannel_list_. // sd->subchannel_list must be equal to // p->latest_pending_subchannel_list_ because we have already filtered // for sds belonging to outdated subchannel lists. GPR_ASSERT(sd->subchannel_list == p->latest_pending_subchannel_list_); GPR_ASSERT(!sd->subchannel_list->shutting_down); if (grpc_lb_round_robin_trace.enabled()) { const size_t num_subchannels = p->subchannel_list_ != nullptr ? p->subchannel_list_->num_subchannels : 0; gpr_log(GPR_DEBUG, "[RR %p] phasing out subchannel list %p (size %" PRIuPTR ") in favor of %p (size %" PRIuPTR ")", p, p->subchannel_list_, num_subchannels, sd->subchannel_list, num_subchannels); } if (p->subchannel_list_ != nullptr) { // dispose of the current subchannel_list grpc_lb_subchannel_list_shutdown_and_unref(p->subchannel_list_, "sl_phase_out_shutdown"); } p->subchannel_list_ = p->latest_pending_subchannel_list_; p->latest_pending_subchannel_list_ = nullptr; } /* at this point we know there's at least one suitable subchannel. Go * ahead and pick one and notify the pending suitors in * p->pending_picks. This preemptively replicates rr_pick()'s actions. */ const size_t next_ready_index = p->GetNextReadySubchannelIndexLocked(); GPR_ASSERT(next_ready_index < p->subchannel_list_->num_subchannels); grpc_lb_subchannel_data* selected = &p->subchannel_list_->subchannels[next_ready_index]; if (p->pending_picks_ != nullptr) { // if the selected subchannel is going to be used for the pending // picks, update the last picked pointer p->UpdateLastReadySubchannelIndexLocked(next_ready_index); } PickState* pick; while ((pick = p->pending_picks_)) { p->pending_picks_ = pick->next; pick->connected_subchannel = selected->connected_subchannel; if (pick->user_data != nullptr) { *pick->user_data = selected->user_data; } if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Fulfilling pending pick. Target <-- subchannel %p " "(subchannel_list %p, index %" PRIuPTR ")", p, selected->subchannel, p->subchannel_list_, next_ready_index); } GRPC_CLOSURE_SCHED(pick->on_complete, GRPC_ERROR_NONE); } break; } case GRPC_CHANNEL_SHUTDOWN: GPR_UNREACHABLE_CODE(return ); case GRPC_CHANNEL_CONNECTING: case GRPC_CHANNEL_IDLE:; // fallthrough } // Update state counters. UpdateStateCountersLocked(sd); // Only update connectivity based on the selected subchannel list. if (sd->subchannel_list == p->subchannel_list_) { p->UpdateConnectivityStatusLocked(sd, GRPC_ERROR_REF(error)); } // Renew notification. grpc_lb_subchannel_data_start_connectivity_watch(sd); } grpc_connectivity_state RoundRobin::CheckConnectivityLocked( grpc_error** error) { return grpc_connectivity_state_get(&state_tracker_, error); } void RoundRobin::NotifyOnStateChangeLocked(grpc_connectivity_state* current, grpc_closure* notify) { grpc_connectivity_state_notify_on_state_change(&state_tracker_, current, notify); } void RoundRobin::PingOneLocked(grpc_closure* on_initiate, grpc_closure* on_ack) { const size_t next_ready_index = GetNextReadySubchannelIndexLocked(); if (next_ready_index < subchannel_list_->num_subchannels) { grpc_lb_subchannel_data* selected = &subchannel_list_->subchannels[next_ready_index]; selected->connected_subchannel->Ping(on_initiate, on_ack); } else { GRPC_CLOSURE_SCHED(on_initiate, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); GRPC_CLOSURE_SCHED(on_ack, GRPC_ERROR_CREATE_FROM_STATIC_STRING( "Round Robin not connected")); } } void RoundRobin::UpdateLocked(const grpc_channel_args& args) { const grpc_arg* arg = grpc_channel_args_find(&args, GRPC_ARG_LB_ADDRESSES); if (arg == nullptr || arg->type != GRPC_ARG_POINTER) { gpr_log(GPR_ERROR, "[RR %p] update provided no addresses; ignoring", this); // If we don't have a current subchannel list, go into TRANSIENT_FAILURE. // Otherwise, keep using the current subchannel list (ignore this update). if (subchannel_list_ == nullptr) { grpc_connectivity_state_set( &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Missing update in args"), "rr_update_missing"); } return; } grpc_lb_addresses* addresses = (grpc_lb_addresses*)arg->value.pointer.p; if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] received update with %" PRIuPTR " addresses", this, addresses->num_addresses); } grpc_lb_subchannel_list* subchannel_list = grpc_lb_subchannel_list_create( this, &grpc_lb_round_robin_trace, addresses, combiner(), client_channel_factory(), args, &RoundRobin::OnConnectivityChangedLocked); if (subchannel_list->num_subchannels == 0) { grpc_connectivity_state_set( &state_tracker_, GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"), "rr_update_empty"); if (subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref(subchannel_list_, "sl_shutdown_empty_update"); } subchannel_list_ = subchannel_list; // empty list return; } if (started_picking_) { for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { const grpc_connectivity_state subchannel_state = grpc_subchannel_check_connectivity( subchannel_list->subchannels[i].subchannel, nullptr); // Override the default setting of IDLE for connectivity notification // purposes if the subchannel is already in transient failure. Otherwise // we'd be immediately notified of the IDLE-TRANSIENT_FAILURE // discrepancy, attempt to re-resolve and end up here again. // TODO(roth): As part of C++-ifying the subchannel_list API, design a // better API for notifying the LB policy of subchannel states, which can // be used both for the subchannel's initial state and for subsequent // state changes. This will allow us to handle this more generally instead // of special-casing TRANSIENT_FAILURE (e.g., we can also distribute any // pending picks across all READY subchannels rather than sending them all // to the first one). if (subchannel_state == GRPC_CHANNEL_TRANSIENT_FAILURE) { subchannel_list->subchannels[i].pending_connectivity_state_unsafe = subchannel_list->subchannels[i].curr_connectivity_state = subchannel_list->subchannels[i].prev_connectivity_state = subchannel_state; --subchannel_list->num_idle; ++subchannel_list->num_transient_failures; } } if (latest_pending_subchannel_list_ != nullptr) { if (grpc_lb_round_robin_trace.enabled()) { gpr_log(GPR_DEBUG, "[RR %p] Shutting down latest pending subchannel list %p, " "about to be replaced by newer latest %p", this, latest_pending_subchannel_list_, subchannel_list); } grpc_lb_subchannel_list_shutdown_and_unref( latest_pending_subchannel_list_, "sl_outdated"); } latest_pending_subchannel_list_ = subchannel_list; for (size_t i = 0; i < subchannel_list->num_subchannels; ++i) { /* Watch every new subchannel. A subchannel list becomes active the * moment one of its subchannels is READY. At that moment, we swap * p->subchannel_list for sd->subchannel_list, provided the subchannel * list is still valid (ie, isn't shutting down) */ SubchannelListRefForConnectivityWatch(subchannel_list, "connectivity_watch"); grpc_lb_subchannel_data_start_connectivity_watch( &subchannel_list->subchannels[i]); } } else { // The policy isn't picking yet. Save the update for later, disposing of // previous version if any. if (subchannel_list_ != nullptr) { grpc_lb_subchannel_list_shutdown_and_unref( subchannel_list_, "rr_update_before_started_picking"); } subchannel_list_ = subchannel_list; } } // // factory // class RoundRobinFactory : public LoadBalancingPolicyFactory { public: OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy( const LoadBalancingPolicy::Args& args) const override { return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(args)); } const char* name() const override { return "round_robin"; } }; } // namespace } // namespace grpc_core void grpc_lb_policy_round_robin_init() { grpc_core::LoadBalancingPolicyRegistry::Builder:: RegisterLoadBalancingPolicyFactory( grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>( grpc_core::New<grpc_core::RoundRobinFactory>())); } void grpc_lb_policy_round_robin_shutdown() {}