/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H #include #include #include #include "absl/container/inlined_vector.h" #include "src/core/ext/filters/client_channel/lb_policy_registry.h" #include "src/core/ext/filters/client_channel/server_address.h" // TODO(roth): Should not need the include of subchannel.h here, since // that implementation should be hidden from the LB policy API. #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_interface.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/debug/trace.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/sockaddr_utils.h" #include "src/core/lib/transport/connectivity_state.h" // Code for maintaining a list of subchannels within an LB policy. // // To use this, callers must create their own subclasses, like so: /* class MySubchannelList; // Forward declaration. class MySubchannelData : public SubchannelData { public: void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) override { // ...code to handle connectivity changes... } }; class MySubchannelList : public SubchannelList { }; */ // All methods will be called from within the client_channel work serializer. namespace grpc_core { // Forward declaration. template class SubchannelList; // Stores data for a particular subchannel in a subchannel list. // Callers must create a subclass that implements the // ProcessConnectivityChangeLocked() method. template class SubchannelData { public: // Returns a pointer to the subchannel list containing this object. SubchannelListType* subchannel_list() const { return static_cast(subchannel_list_); } // Returns the index into the subchannel list of this object. size_t Index() const { return static_cast(static_cast(this) - subchannel_list_->subchannel(0)); } // Returns a pointer to the subchannel. SubchannelInterface* subchannel() const { return subchannel_.get(); } // Synchronously checks the subchannel's connectivity state. // Must not be called while there is a connectivity notification // pending (i.e., between calling StartConnectivityWatchLocked() and // calling CancelConnectivityWatchLocked()). grpc_connectivity_state CheckConnectivityStateLocked() { GPR_ASSERT(pending_watcher_ == nullptr); connectivity_state_ = subchannel_->CheckConnectivityState(); return connectivity_state_; } // Resets the connection backoff. // TODO(roth): This method should go away when we move the backoff // code out of the subchannel and into the LB policies. void ResetBackoffLocked(); // Starts watching the connectivity state of the subchannel. // ProcessConnectivityChangeLocked() will be called whenever the // connectivity state changes. void StartConnectivityWatchLocked(); // Cancels watching the connectivity state of the subchannel. void CancelConnectivityWatchLocked(const char* reason); // Cancels any pending connectivity watch and unrefs the subchannel. void ShutdownLocked(); protected: SubchannelData( SubchannelList* subchannel_list, const ServerAddress& address, RefCountedPtr subchannel); virtual ~SubchannelData(); // After StartConnectivityWatchLocked() is called, this method will be // invoked whenever the subchannel's connectivity state changes. // To stop watching, use CancelConnectivityWatchLocked(). virtual void ProcessConnectivityChangeLocked( grpc_connectivity_state connectivity_state) = 0; private: // Watcher for subchannel connectivity state. class Watcher : public SubchannelInterface::ConnectivityStateWatcherInterface { public: Watcher( SubchannelData* subchannel_data, RefCountedPtr subchannel_list) : subchannel_data_(subchannel_data), subchannel_list_(std::move(subchannel_list)) {} ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); } void OnConnectivityStateChange(grpc_connectivity_state new_state) override; grpc_pollset_set* interested_parties() override { return subchannel_list_->policy()->interested_parties(); } private: SubchannelData* subchannel_data_; RefCountedPtr subchannel_list_; }; // Unrefs the subchannel. void UnrefSubchannelLocked(const char* reason); // Backpointer to owning subchannel list. Not owned. SubchannelList* subchannel_list_; // The subchannel. RefCountedPtr subchannel_; // Will be non-null when the subchannel's state is being watched. SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ = nullptr; // Data updated by the watcher. grpc_connectivity_state connectivity_state_; }; // A list of subchannels. template class SubchannelList : public InternallyRefCounted { public: typedef absl::InlinedVector SubchannelVector; // The number of subchannels in the list. size_t num_subchannels() const { return subchannels_.size(); } // The data for the subchannel at a particular index. SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; } // Returns true if the subchannel list is shutting down. bool shutting_down() const { return shutting_down_; } // Accessors. LoadBalancingPolicy* policy() const { return policy_; } TraceFlag* tracer() const { return tracer_; } // Resets connection backoff of all subchannels. // TODO(roth): We will probably need to rethink this as part of moving // the backoff code out of subchannels and into LB policies. void ResetBackoffLocked(); void Orphan() override { ShutdownLocked(); InternallyRefCounted::Unref(DEBUG_LOCATION, "shutdown"); } protected: SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer, const ServerAddressList& addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args); virtual ~SubchannelList(); private: // For accessing Ref() and Unref(). friend class SubchannelData; void ShutdownLocked(); // Backpointer to owning policy. LoadBalancingPolicy* policy_; TraceFlag* tracer_; // The list of subchannels. SubchannelVector subchannels_; // Is this list shutting down? This may be true due to the shutdown of the // policy itself or because a newer update has arrived while this one hadn't // finished processing. bool shutting_down_ = false; }; // // implementation -- no user-servicable parts below // // // SubchannelData::Watcher // template void SubchannelData::Watcher:: OnConnectivityStateChange(grpc_connectivity_state new_state) { if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): connectivity changed: state=%s, " "shutting_down=%d, pending_watcher=%p", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_.get(), subchannel_data_->Index(), subchannel_list_->num_subchannels(), subchannel_data_->subchannel_.get(), ConnectivityStateName(new_state), subchannel_list_->shutting_down(), subchannel_data_->pending_watcher_); } if (!subchannel_list_->shutting_down() && subchannel_data_->pending_watcher_ != nullptr) { subchannel_data_->connectivity_state_ = new_state; // Call the subclass's ProcessConnectivityChangeLocked() method. subchannel_data_->ProcessConnectivityChangeLocked(new_state); } } // // SubchannelData // template SubchannelData::SubchannelData( SubchannelList* subchannel_list, const ServerAddress& /*address*/, RefCountedPtr subchannel) : subchannel_list_(subchannel_list), subchannel_(std::move(subchannel)), // We assume that the current state is IDLE. If not, we'll get a // callback telling us that. connectivity_state_(GRPC_CHANNEL_IDLE) {} template SubchannelData::~SubchannelData() { GPR_ASSERT(subchannel_ == nullptr); } template void SubchannelData:: UnrefSubchannelLocked(const char* reason) { if (subchannel_ != nullptr) { if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): unreffing subchannel (%s)", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_.get(), reason); } subchannel_.reset(); } } template void SubchannelData::ResetBackoffLocked() { if (subchannel_ != nullptr) { subchannel_->ResetBackoff(); } } template void SubchannelData::StartConnectivityWatchLocked() { if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): starting watch (from %s)", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_.get(), ConnectivityStateName(connectivity_state_)); } GPR_ASSERT(pending_watcher_ == nullptr); pending_watcher_ = new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher")); subchannel_->WatchConnectivityState( connectivity_state_, std::unique_ptr( pending_watcher_)); } template void SubchannelData:: CancelConnectivityWatchLocked(const char* reason) { if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR " (subchannel %p): canceling connectivity watch (%s)", subchannel_list_->tracer()->name(), subchannel_list_->policy(), subchannel_list_, Index(), subchannel_list_->num_subchannels(), subchannel_.get(), reason); } if (pending_watcher_ != nullptr) { subchannel_->CancelConnectivityStateWatch(pending_watcher_); pending_watcher_ = nullptr; } } template void SubchannelData::ShutdownLocked() { if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown"); UnrefSubchannelLocked("shutdown"); } // // SubchannelList // template SubchannelList::SubchannelList( LoadBalancingPolicy* policy, TraceFlag* tracer, const ServerAddressList& addresses, LoadBalancingPolicy::ChannelControlHelper* helper, const grpc_channel_args& args) : InternallyRefCounted(tracer), policy_(policy), tracer_(tracer) { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels", tracer_->name(), policy, this, addresses.size()); } subchannels_.reserve(addresses.size()); // We need to remove the LB addresses in order to be able to compare the // subchannel keys of subchannels from a different batch of addresses. // We remove the service config, since it will be passed into the // subchannel via call context. static const char* keys_to_remove[] = {GRPC_ARG_SUBCHANNEL_ADDRESS, GRPC_ARG_SERVICE_CONFIG}; // Create a subchannel for each address. for (size_t i = 0; i < addresses.size(); i++) { absl::InlinedVector args_to_add; const size_t subchannel_address_arg_index = args_to_add.size(); args_to_add.emplace_back( Subchannel::CreateSubchannelAddressArg(&addresses[i].address())); if (addresses[i].args() != nullptr) { for (size_t j = 0; j < addresses[i].args()->num_args; ++j) { args_to_add.emplace_back(addresses[i].args()->args[j]); } } grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove( &args, keys_to_remove, GPR_ARRAY_SIZE(keys_to_remove), args_to_add.data(), args_to_add.size()); gpr_free(args_to_add[subchannel_address_arg_index].value.string); RefCountedPtr subchannel = helper->CreateSubchannel(*new_args); grpc_channel_args_destroy(new_args); if (subchannel == nullptr) { // Subchannel could not be created. if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] could not create subchannel for address uri %s, " "ignoring", tracer_->name(), policy_, grpc_sockaddr_to_uri(&addresses[i].address()).c_str()); } continue; } if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] subchannel list %p index %" PRIuPTR ": Created subchannel %p for address uri %s", tracer_->name(), policy_, this, subchannels_.size(), subchannel.get(), grpc_sockaddr_to_uri(&addresses[i].address()).c_str()); } subchannels_.emplace_back(this, addresses[i], std::move(subchannel)); } } template SubchannelList::~SubchannelList() { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(), policy_, this); } } template void SubchannelList::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) { gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p", tracer_->name(), policy_, this); } GPR_ASSERT(!shutting_down_); shutting_down_ = true; for (size_t i = 0; i < subchannels_.size(); i++) { SubchannelDataType* sd = &subchannels_[i]; sd->ShutdownLocked(); } } template void SubchannelList::ResetBackoffLocked() { for (size_t i = 0; i < subchannels_.size(); i++) { SubchannelDataType* sd = &subchannels_[i]; sd->ResetBackoffLocked(); } } } // namespace grpc_core #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */