// // Copyright 2015 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H #include #include #include #include #include #include "absl/status/status.h" #include "absl/types/optional.h" #include #include "src/core/ext/filters/client_channel/client_channel_factory.h" #include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/dynamic_filters.h" #include "src/core/ext/filters/client_channel/lb_policy.h" #include "src/core/ext/filters/client_channel/resolver.h" #include "src/core/ext/filters/client_channel/resolver_result_parsing.h" #include "src/core/ext/filters/client_channel/retry_throttle.h" #include "src/core/ext/filters/client_channel/service_config.h" #include "src/core/ext/filters/client_channel/subchannel.h" #include "src/core/ext/filters/client_channel/subchannel_pool_interface.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/iomgr/error.h" #include "src/core/lib/iomgr/polling_entity.h" #include "src/core/lib/iomgr/work_serializer.h" #include "src/core/lib/surface/channel.h" #include "src/core/lib/transport/connectivity_state.h" // // Client channel filter // // A client channel is a channel that begins disconnected, and can connect // to some endpoint on demand. If that endpoint disconnects, it will be // connected to again later. // // Calls on a disconnected client channel are queued until a connection is // established. // Channel arg key for server URI string. #define GRPC_ARG_SERVER_URI "grpc.server_uri" // Channel arg containing a pointer to the ClientChannel object. #define GRPC_ARG_CLIENT_CHANNEL "grpc.internal.client_channel" // Channel arg containing a pointer to the ServiceConfig object. #define GRPC_ARG_SERVICE_CONFIG_OBJ "grpc.internal.service_config_obj" // Max number of batches that can be pending on a call at any given // time. This includes one batch for each of the following ops: // recv_initial_metadata // send_initial_metadata // recv_message // send_message // recv_trailing_metadata // send_trailing_metadata #define MAX_PENDING_BATCHES 6 namespace grpc_core { class ClientChannel { public: static const grpc_channel_filter kFilterVtable; class LoadBalancedCall; // Returns the ClientChannel object from channel, or null if channel // is not a client channel. static ClientChannel* GetFromChannel(grpc_channel* channel); grpc_connectivity_state CheckConnectivityState(bool try_to_connect); // Starts a one-time connectivity state watch. When the channel's state // becomes different from *state, sets *state to the new state and // schedules on_complete. The watcher_timer_init callback is invoked as // soon as the watch is actually started (i.e., after hopping into the // client channel combiner). I/O will be serviced via pollent. // // This is intended to be used when starting a watch from outside of C-core // via grpc_channel_watch_connectivity_state(). It should not be used // by other callers. void AddExternalConnectivityWatcher(grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init) { new ExternalConnectivityWatcher(this, pollent, state, on_complete, watcher_timer_init); } // Cancels a pending external watcher previously added by // AddExternalConnectivityWatcher(). void CancelExternalConnectivityWatcher(grpc_closure* on_complete) { ExternalConnectivityWatcher::RemoveWatcherFromExternalWatchersMap( this, on_complete, /*cancel=*/true); } int NumExternalConnectivityWatchers() const { MutexLock lock(&external_watchers_mu_); return static_cast(external_watchers_.size()); } // Starts and stops a connectivity watch. The watcher will be initially // notified as soon as the state changes from initial_state and then on // every subsequent state change until either the watch is stopped or // it is notified that the state has changed to SHUTDOWN. // // This is intended to be used when starting watches from code inside of // C-core (e.g., for a nested control plane channel for things like xds). void AddConnectivityWatcher( grpc_connectivity_state initial_state, OrphanablePtr watcher); void RemoveConnectivityWatcher( AsyncConnectivityStateWatcherInterface* watcher); RefCountedPtr CreateLoadBalancedCall( const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete); private: class CallData; class ResolverResultHandler; class SubchannelWrapper; class ClientChannelControlHelper; class ConnectivityWatcherAdder; class ConnectivityWatcherRemover; // Represents a pending connectivity callback from an external caller // via grpc_client_channel_watch_connectivity_state(). class ExternalConnectivityWatcher : public ConnectivityStateWatcherInterface { public: ExternalConnectivityWatcher(ClientChannel* chand, grpc_polling_entity pollent, grpc_connectivity_state* state, grpc_closure* on_complete, grpc_closure* watcher_timer_init); ~ExternalConnectivityWatcher() override; // Removes the watcher from the external_watchers_ map. static void RemoveWatcherFromExternalWatchersMap(ClientChannel* chand, grpc_closure* on_complete, bool cancel); void Notify(grpc_connectivity_state state, const absl::Status& /* status */) override; void Cancel(); private: // Adds the watcher to state_tracker_. Consumes the ref that is passed to it // from Start(). void AddWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_); void RemoveWatcherLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(chand_->work_serializer_); ClientChannel* chand_; grpc_polling_entity pollent_; grpc_connectivity_state initial_state_; grpc_connectivity_state* state_; grpc_closure* on_complete_; grpc_closure* watcher_timer_init_; Atomic done_{false}; }; struct ResolverQueuedCall { grpc_call_element* elem; ResolverQueuedCall* next = nullptr; }; struct LbQueuedCall { LoadBalancedCall* lb_call; LbQueuedCall* next = nullptr; }; ClientChannel(grpc_channel_element_args* args, grpc_error_handle* error); ~ClientChannel(); // Filter vtable functions. static grpc_error_handle Init(grpc_channel_element* elem, grpc_channel_element_args* args); static void Destroy(grpc_channel_element* elem); static void StartTransportOp(grpc_channel_element* elem, grpc_transport_op* op); static void GetChannelInfo(grpc_channel_element* elem, const grpc_channel_info* info); // Note: Does NOT return a new ref. grpc_error_handle disconnect_error() const { return disconnect_error_.Load(MemoryOrder::ACQUIRE); } // Note: All methods with "Locked" suffix must be invoked from within // work_serializer_. void OnResolverResultChangedLocked(Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void OnResolverErrorLocked(grpc_error_handle error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void CreateOrUpdateLbPolicyLocked( RefCountedPtr lb_policy_config, Resolver::Result result) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); OrphanablePtr CreateLbPolicyLocked( const grpc_channel_args& args) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void UpdateStateAndPickerLocked( grpc_connectivity_state state, const absl::Status& status, const char* reason, std::unique_ptr picker) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void UpdateServiceConfigInControlPlaneLocked( RefCountedPtr service_config, RefCountedPtr config_selector, const internal::ClientChannelGlobalParsedConfig* parsed_service_config, const char* lb_policy_name) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void UpdateServiceConfigInDataPlaneLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void CreateResolverLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void DestroyResolverAndLbPolicyLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); grpc_error_handle DoPingLocked(grpc_transport_op* op) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void StartTransportOpLocked(grpc_transport_op* op) ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); void TryToConnectLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(work_serializer_); // These methods all require holding resolution_mu_. void AddResolverQueuedCall(ResolverQueuedCall* call, grpc_polling_entity* pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); void RemoveResolverQueuedCall(ResolverQueuedCall* to_remove, grpc_polling_entity* pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(resolution_mu_); // These methods all require holding data_plane_mu_. void AddLbQueuedCall(LbQueuedCall* call, grpc_polling_entity* pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); void RemoveLbQueuedCall(LbQueuedCall* to_remove, grpc_polling_entity* pollent) ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); RefCountedPtr GetConnectedSubchannelInDataPlane( SubchannelInterface* subchannel) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(data_plane_mu_); // // Fields set at construction and never modified. // const bool deadline_checking_enabled_; const bool enable_retries_; grpc_channel_stack* owning_stack_; ClientChannelFactory* client_channel_factory_; const grpc_channel_args* channel_args_; RefCountedPtr default_service_config_; std::string server_name_; UniquePtr target_uri_; channelz::ChannelNode* channelz_node_; grpc_pollset_set* interested_parties_; // // Fields related to name resolution. Guarded by resolution_mu_. // mutable Mutex resolution_mu_; // Linked list of calls queued waiting for resolver result. ResolverQueuedCall* resolver_queued_calls_ ABSL_GUARDED_BY(resolution_mu_) = nullptr; // Data from service config. grpc_error_handle resolver_transient_failure_error_ ABSL_GUARDED_BY(resolution_mu_) = GRPC_ERROR_NONE; bool received_service_config_data_ ABSL_GUARDED_BY(resolution_mu_) = false; RefCountedPtr service_config_ ABSL_GUARDED_BY(resolution_mu_); RefCountedPtr config_selector_ ABSL_GUARDED_BY(resolution_mu_); RefCountedPtr dynamic_filters_ ABSL_GUARDED_BY(resolution_mu_); // // Fields used in the data plane. Guarded by data_plane_mu_. // mutable Mutex data_plane_mu_; std::unique_ptr picker_ ABSL_GUARDED_BY(data_plane_mu_); // Linked list of calls queued waiting for LB pick. LbQueuedCall* lb_queued_calls_ ABSL_GUARDED_BY(data_plane_mu_) = nullptr; // // Fields used in the control plane. Guarded by work_serializer. // std::shared_ptr work_serializer_; ConnectivityStateTracker state_tracker_ ABSL_GUARDED_BY(work_serializer_); OrphanablePtr resolver_ ABSL_GUARDED_BY(work_serializer_); bool previous_resolution_contained_addresses_ ABSL_GUARDED_BY(work_serializer_) = false; RefCountedPtr saved_service_config_ ABSL_GUARDED_BY(work_serializer_); RefCountedPtr saved_config_selector_ ABSL_GUARDED_BY(work_serializer_); absl::optional health_check_service_name_ ABSL_GUARDED_BY(work_serializer_); OrphanablePtr lb_policy_ ABSL_GUARDED_BY(work_serializer_); RefCountedPtr subchannel_pool_ ABSL_GUARDED_BY(work_serializer_); // The number of SubchannelWrapper instances referencing a given Subchannel. std::map subchannel_refcount_map_ ABSL_GUARDED_BY(work_serializer_); // The set of SubchannelWrappers that currently exist. // No need to hold a ref, since the map is updated in the control-plane // work_serializer when the SubchannelWrappers are created and destroyed. std::set subchannel_wrappers_ ABSL_GUARDED_BY(work_serializer_); // Pending ConnectedSubchannel updates for each SubchannelWrapper. // Updates are queued here in the control plane work_serializer and then // applied in the data plane mutex when the picker is updated. std::map, RefCountedPtr> pending_subchannel_updates_ ABSL_GUARDED_BY(work_serializer_); int keepalive_time_ ABSL_GUARDED_BY(work_serializer_) = -1; // // Fields accessed from both data plane mutex and control plane // work_serializer. // Atomic disconnect_error_; // // Fields guarded by a mutex, since they need to be accessed // synchronously via get_channel_info(). // Mutex info_mu_; UniquePtr info_lb_policy_name_ ABSL_GUARDED_BY(info_mu_); UniquePtr info_service_config_json_ ABSL_GUARDED_BY(info_mu_); // // Fields guarded by a mutex, since they need to be accessed // synchronously via grpc_channel_num_external_connectivity_watchers(). // mutable Mutex external_watchers_mu_; std::map> external_watchers_ ABSL_GUARDED_BY(external_watchers_mu_); }; // // ClientChannel::LoadBalancedCall // // This object is ref-counted, but it cannot inherit from RefCounted<>, // because it is allocated on the arena and can't free its memory when // its refcount goes to zero. So instead, it manually implements the // same API as RefCounted<>, so that it can be used with RefCountedPtr<>. class ClientChannel::LoadBalancedCall : public RefCounted { public: // If on_call_destruction_complete is non-null, then it will be // invoked once the LoadBalancedCall is completely destroyed. // If it is null, then the caller is responsible for checking whether // the LB call has a subchannel call and ensuring that the // on_call_destruction_complete closure passed down from the surface // is not invoked until after the subchannel call stack is destroyed. LoadBalancedCall(ClientChannel* chand, const grpc_call_element_args& args, grpc_polling_entity* pollent, grpc_closure* on_call_destruction_complete); ~LoadBalancedCall() override; void StartTransportStreamOpBatch(grpc_transport_stream_op_batch* batch); // Invoked by channel for queued LB picks when the picker is updated. static void PickSubchannel(void* arg, grpc_error_handle error); // Helper function for performing an LB pick while holding the data plane // mutex. Returns true if the pick is complete, in which case the caller // must invoke PickDone() or AsyncPickDone() with the returned error. bool PickSubchannelLocked(grpc_error_handle* error) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); // Schedules a callback to process the completed pick. The callback // will not run until after this method returns. void AsyncPickDone(grpc_error_handle error); RefCountedPtr subchannel_call() const { return subchannel_call_; } private: class LbQueuedCallCanceller; class Metadata; class LbCallState; // Returns the index into pending_batches_ to be used for batch. static size_t GetBatchIndex(grpc_transport_stream_op_batch* batch); void PendingBatchesAdd(grpc_transport_stream_op_batch* batch); static void FailPendingBatchInCallCombiner(void* arg, grpc_error_handle error); // A predicate type and some useful implementations for PendingBatchesFail(). typedef bool (*YieldCallCombinerPredicate)( const CallCombinerClosureList& closures); static bool YieldCallCombiner(const CallCombinerClosureList& /*closures*/) { return true; } static bool NoYieldCallCombiner(const CallCombinerClosureList& /*closures*/) { return false; } static bool YieldCallCombinerIfPendingBatchesFound( const CallCombinerClosureList& closures) { return closures.size() > 0; } // Fails all pending batches. // If yield_call_combiner_predicate returns true, assumes responsibility for // yielding the call combiner. void PendingBatchesFail( grpc_error_handle error, YieldCallCombinerPredicate yield_call_combiner_predicate); static void ResumePendingBatchInCallCombiner(void* arg, grpc_error_handle ignored); // Resumes all pending batches on subchannel_call_. void PendingBatchesResume(); static void RecvTrailingMetadataReadyForLoadBalancingPolicy( void* arg, grpc_error_handle error); void InjectRecvTrailingMetadataReadyForLoadBalancingPolicy( grpc_transport_stream_op_batch* batch); void CreateSubchannelCall(); // Invoked when a pick is completed, on both success or failure. static void PickDone(void* arg, grpc_error_handle error); // Removes the call from the channel's list of queued picks if present. void MaybeRemoveCallFromLbQueuedCallsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); // Adds the call to the channel's list of queued picks if not already present. void MaybeAddCallToLbQueuedCallsLocked() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&ClientChannel::data_plane_mu_); ClientChannel* chand_; // TODO(roth): Instead of duplicating these fields in every filter // that uses any one of them, we should store them in the call // context. This will save per-call memory overhead. grpc_slice path_; // Request path. gpr_cycle_counter call_start_time_; grpc_millis deadline_; Arena* arena_; grpc_call_stack* owning_call_; CallCombiner* call_combiner_; grpc_call_context_element* call_context_; grpc_polling_entity* pollent_; grpc_closure* on_call_destruction_complete_; // Set when we get a cancel_stream op. grpc_error_handle cancel_error_ = GRPC_ERROR_NONE; // Set when we fail inside the LB call. grpc_error_handle failure_error_ = GRPC_ERROR_NONE; grpc_closure pick_closure_; // Accessed while holding ClientChannel::data_plane_mu_. ClientChannel::LbQueuedCall queued_call_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_); bool queued_pending_lb_pick_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = false; LbQueuedCallCanceller* lb_call_canceller_ ABSL_GUARDED_BY(&ClientChannel::data_plane_mu_) = nullptr; RefCountedPtr connected_subchannel_; const LoadBalancingPolicy::BackendMetricData* backend_metric_data_ = nullptr; std::function lb_recv_trailing_metadata_ready_; RefCountedPtr subchannel_call_; // For intercepting recv_trailing_metadata_ready for the LB policy. grpc_metadata_batch* recv_trailing_metadata_ = nullptr; grpc_closure recv_trailing_metadata_ready_; grpc_closure* original_recv_trailing_metadata_ready_ = nullptr; // Batches are added to this list when received from above. // They are removed when we are done handling the batch (i.e., when // either we have invoked all of the batch's callbacks or we have // passed the batch down to the subchannel call and are not // intercepting any of its callbacks). grpc_transport_stream_op_batch* pending_batches_[MAX_PENDING_BATCHES] = {}; }; } // namespace grpc_core #endif // GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_CLIENT_CHANNEL_H