// // Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_HEALTH_CHECK_CLIENT_INTERNAL_H #define GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_HEALTH_CHECK_CLIENT_INTERNAL_H #include #include #include #include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_interface_internal.h" #include "src/core/ext/filters/client_channel/subchannel_stream_client.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/unique_type_name.h" #include "src/core/lib/gprpp/work_serializer.h" #include "src/core/lib/iomgr/iomgr_fwd.h" #include "src/core/lib/iomgr/pollset_set.h" #include "src/core/lib/load_balancing/subchannel_interface.h" namespace grpc_core { class HealthWatcher; // This producer is registered with a subchannel. It creates a streaming // health watch call for each health check service name that is being // watched and reports the resulting connectivity state to all // registered watchers. class HealthProducer : public Subchannel::DataProducerInterface { public: HealthProducer() : interested_parties_(grpc_pollset_set_create()) {} ~HealthProducer() override { grpc_pollset_set_destroy(interested_parties_); } void Start(RefCountedPtr subchannel); void Orphan() override; static UniqueTypeName Type() { static UniqueTypeName::Factory kFactory("health_check"); return kFactory.Create(); } UniqueTypeName type() const override { return Type(); } void AddWatcher(HealthWatcher* watcher, const absl::optional& health_check_service_name); void RemoveWatcher( HealthWatcher* watcher, const absl::optional& health_check_service_name); private: class ConnectivityWatcher; // Health checker for a given health check service name. Contains the // health check client and the list of watchers. class HealthChecker : public InternallyRefCounted { public: HealthChecker(WeakRefCountedPtr producer, absl::string_view health_check_service_name); // Disable thread-safety analysis because this method is called via // OrphanablePtr<>, but there's no way to pass the lock annotation // through there. void Orphan() override ABSL_NO_THREAD_SAFETY_ANALYSIS; void AddWatcherLocked(HealthWatcher* watcher) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); // Returns true if this was the last watcher. bool RemoveWatcherLocked(HealthWatcher* watcher) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); // Called when the subchannel's connectivity state changes. void OnConnectivityStateChangeLocked(grpc_connectivity_state state, const absl::Status& status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); private: class HealthStreamEventHandler; // Starts a new stream if we have a connected subchannel. // Called whenever the subchannel transitions to state READY or when a // watcher is added. void StartHealthStreamLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); // Notifies watchers of a new state. // Called while holding the SubchannelStreamClient lock and possibly // the producer lock, so must notify asynchronously, but in guaranteed // order (hence the use of WorkSerializer). void NotifyWatchersLocked(grpc_connectivity_state state, absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&HealthProducer::mu_); // Called by the health check client when receiving an update. void OnHealthWatchStatusChange(grpc_connectivity_state state, const absl::Status& status); WeakRefCountedPtr producer_; absl::string_view health_check_service_name_; std::shared_ptr work_serializer_ = std::make_shared( producer_->subchannel_->event_engine()); absl::optional state_ ABSL_GUARDED_BY(&HealthProducer::mu_); absl::Status status_ ABSL_GUARDED_BY(&HealthProducer::mu_); OrphanablePtr stream_client_ ABSL_GUARDED_BY(&HealthProducer::mu_); std::set watchers_ ABSL_GUARDED_BY(&HealthProducer::mu_); }; // Handles a connectivity state change on the subchannel. void OnConnectivityStateChange(grpc_connectivity_state state, const absl::Status& status); RefCountedPtr subchannel_; ConnectivityWatcher* connectivity_watcher_; grpc_pollset_set* interested_parties_; Mutex mu_; absl::optional state_ ABSL_GUARDED_BY(&mu_); absl::Status status_ ABSL_GUARDED_BY(&mu_); RefCountedPtr connected_subchannel_ ABSL_GUARDED_BY(&mu_); std::map> health_checkers_ ABSL_GUARDED_BY(&mu_); std::set non_health_watchers_ ABSL_GUARDED_BY(&mu_); }; // A data watcher that handles health checking. class HealthWatcher : public InternalSubchannelDataWatcherInterface { public: HealthWatcher( std::shared_ptr work_serializer, absl::optional health_check_service_name, std::unique_ptr watcher) : work_serializer_(std::move(work_serializer)), health_check_service_name_(std::move(health_check_service_name)), watcher_(std::move(watcher)) {} ~HealthWatcher() override; UniqueTypeName type() const override { return HealthProducer::Type(); } // When the client channel sees this wrapper, it will pass it the real // subchannel to use. void SetSubchannel(Subchannel* subchannel) override; // For intercepting the watcher before it gets up to the real subchannel. std::shared_ptr TakeWatcher() { return std::move(watcher_); } void SetWatcher( std::shared_ptr watcher) { watcher_ = std::move(watcher); } void Notify(grpc_connectivity_state state, absl::Status status); grpc_pollset_set* interested_parties() const { return watcher_->interested_parties(); } private: std::shared_ptr work_serializer_; absl::optional health_check_service_name_; std::shared_ptr watcher_; RefCountedPtr producer_; }; } // namespace grpc_core #endif // GRPC_SRC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_HEALTH_CHECK_CLIENT_INTERNAL_H