/* * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "absl/strings/match.h" #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" #include "re2/re2.h" #define XXH_INLINE_ALL #include "xxhash.h" #include "src/core/ext/filters/client_channel/config_selector.h" #include "src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.h" #include "src/core/ext/filters/client_channel/resolver_registry.h" #include "src/core/ext/xds/xds_channel_args.h" #include "src/core/ext/xds/xds_client.h" #include "src/core/ext/xds/xds_http_filters.h" #include "src/core/lib/channel/channel_args.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/exec_ctx.h" #include "src/core/lib/transport/timeout_encoding.h" namespace grpc_core { TraceFlag grpc_xds_resolver_trace(false, "xds_resolver"); const char* kXdsClusterAttribute = "xds_cluster_name"; namespace { // // XdsResolver // class XdsResolver : public Resolver { public: explicit XdsResolver(ResolverArgs args) : work_serializer_(std::move(args.work_serializer)), result_handler_(std::move(args.result_handler)), server_name_(absl::StripPrefix(args.uri.path(), "/")), args_(grpc_channel_args_copy(args.args)), interested_parties_(args.pollset_set) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] created for server name %s", this, server_name_.c_str()); } } ~XdsResolver() override { grpc_channel_args_destroy(args_); if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] destroyed", this); } } void StartLocked() override; void ShutdownLocked() override; void ResetBackoffLocked() override { if (xds_client_ != nullptr) xds_client_->ResetBackoff(); } private: class Notifier { public: Notifier(RefCountedPtr resolver, XdsApi::LdsUpdate update); Notifier(RefCountedPtr resolver, XdsApi::RdsUpdate update); Notifier(RefCountedPtr resolver, grpc_error_handle error); explicit Notifier(RefCountedPtr resolver); private: enum Type { kLdsUpdate, kRdsUpdate, kError, kDoesNotExist }; static void RunInExecCtx(void* arg, grpc_error_handle error); void RunInWorkSerializer(grpc_error_handle error); RefCountedPtr resolver_; grpc_closure closure_; XdsApi::LdsUpdate update_; Type type_; }; class ListenerWatcher : public XdsClient::ListenerWatcherInterface { public: explicit ListenerWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} void OnListenerChanged(XdsApi::LdsUpdate listener) override { new Notifier(resolver_, std::move(listener)); } void OnError(grpc_error_handle error) override { new Notifier(resolver_, error); } void OnResourceDoesNotExist() override { new Notifier(resolver_); } private: RefCountedPtr resolver_; }; class RouteConfigWatcher : public XdsClient::RouteConfigWatcherInterface { public: explicit RouteConfigWatcher(RefCountedPtr resolver) : resolver_(std::move(resolver)) {} void OnRouteConfigChanged(XdsApi::RdsUpdate route_config) override { new Notifier(resolver_, std::move(route_config)); } void OnError(grpc_error_handle error) override { new Notifier(resolver_, error); } void OnResourceDoesNotExist() override { new Notifier(resolver_); } private: RefCountedPtr resolver_; }; // An entry in the map of clusters that need to be present in the LB // policy config. The map holds a weak ref. One strong ref is held by // the ConfigSelector, and another is held by each call assigned to // the cluster by the ConfigSelector. The ref for each call is held // until the call is committed. When the strong refs go away, we hop // back into the WorkSerializer to remove the entry from the map. class ClusterState : public DualRefCounted { public: using ClusterStateMap = std::map>; ClusterState(RefCountedPtr resolver, const std::string& cluster_name) : resolver_(std::move(resolver)), it_(resolver_->cluster_state_map_.emplace(cluster_name, WeakRef()) .first) {} void Orphan() override { auto* resolver = resolver_.release(); resolver->work_serializer_->Run( [resolver]() { resolver->MaybeRemoveUnusedClusters(); resolver->Unref(); }, DEBUG_LOCATION); } const std::string& cluster() const { return it_->first; } private: RefCountedPtr resolver_; ClusterStateMap::iterator it_; }; // Call dispatch controller, created for each call handled by the // ConfigSelector. Holds a ref to the ClusterState object until the // call is committed. class XdsCallDispatchController : public ConfigSelector::CallDispatchController { public: explicit XdsCallDispatchController( RefCountedPtr cluster_state) : cluster_state_(std::move(cluster_state)) {} bool ShouldRetry() override { // TODO(donnadionne): Implement the retry circuit breaker here. return true; } void Commit() override { // TODO(donnadionne): If ShouldRetry() was called previously, // decrement the retry circuit breaker counter. cluster_state_.reset(); } private: // Note: The XdsCallDispatchController object is never actually destroyed, // so do not add any data members that require destruction unless you have // some other way to clean them up. RefCountedPtr cluster_state_; }; class XdsConfigSelector : public ConfigSelector { public: XdsConfigSelector(RefCountedPtr resolver, grpc_error_handle* error); ~XdsConfigSelector() override; const char* name() const override { return "XdsConfigSelector"; } bool Equals(const ConfigSelector* other) const override { const auto* other_xds = static_cast(other); // Don't need to compare resolver_, since that will always be the same. return route_table_ == other_xds->route_table_ && clusters_ == other_xds->clusters_; } CallConfig GetCallConfig(GetCallConfigArgs args) override; std::vector GetFilters() override { return filters_; } grpc_channel_args* ModifyChannelArgs(grpc_channel_args* args) override; private: struct Route { struct ClusterWeightState { uint32_t range_end; absl::string_view cluster; RefCountedPtr method_config; bool operator==(const ClusterWeightState& other) const; }; XdsApi::Route route; RefCountedPtr method_config; absl::InlinedVector weighted_cluster_state; bool operator==(const Route& other) const; }; using RouteTable = std::vector; void MaybeAddCluster(const std::string& name); grpc_error_handle CreateMethodConfig( const XdsApi::Route& route, const XdsApi::Route::ClusterWeight* cluster_weight, RefCountedPtr* method_config); RefCountedPtr resolver_; RouteTable route_table_; std::map> clusters_; std::vector filters_; }; void OnListenerUpdate(XdsApi::LdsUpdate listener); void OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update); void OnError(grpc_error_handle error); void OnResourceDoesNotExist(); grpc_error_handle CreateServiceConfig( RefCountedPtr* service_config); void GenerateResult(); void MaybeRemoveUnusedClusters(); std::shared_ptr work_serializer_; std::unique_ptr result_handler_; std::string server_name_; const grpc_channel_args* args_; grpc_pollset_set* interested_parties_; RefCountedPtr xds_client_; XdsClient::ListenerWatcherInterface* listener_watcher_ = nullptr; // This will not contain the RouteConfiguration, even if it comes with the // LDS response; instead, the relevant VirtualHost from the // RouteConfiguration will be saved in current_virtual_host_. XdsApi::LdsUpdate current_listener_; std::string route_config_name_; XdsClient::RouteConfigWatcherInterface* route_config_watcher_ = nullptr; XdsApi::RdsUpdate::VirtualHost current_virtual_host_; ClusterState::ClusterStateMap cluster_state_map_; }; // // XdsResolver::Notifier // XdsResolver::Notifier::Notifier(RefCountedPtr resolver, XdsApi::LdsUpdate update) : resolver_(std::move(resolver)), update_(std::move(update)), type_(kLdsUpdate) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } XdsResolver::Notifier::Notifier(RefCountedPtr resolver, XdsApi::RdsUpdate update) : resolver_(std::move(resolver)), type_(kRdsUpdate) { update_.http_connection_manager.rds_update = std::move(update); GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } XdsResolver::Notifier::Notifier(RefCountedPtr resolver, grpc_error_handle error) : resolver_(std::move(resolver)), type_(kError) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, error); } XdsResolver::Notifier::Notifier(RefCountedPtr resolver) : resolver_(std::move(resolver)), type_(kDoesNotExist) { GRPC_CLOSURE_INIT(&closure_, &RunInExecCtx, this, nullptr); ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE); } void XdsResolver::Notifier::RunInExecCtx(void* arg, grpc_error_handle error) { Notifier* self = static_cast(arg); GRPC_ERROR_REF(error); self->resolver_->work_serializer_->Run( [self, error]() { self->RunInWorkSerializer(error); }, DEBUG_LOCATION); } void XdsResolver::Notifier::RunInWorkSerializer(grpc_error_handle error) { if (resolver_->xds_client_ == nullptr) { GRPC_ERROR_UNREF(error); delete this; return; } switch (type_) { case kLdsUpdate: resolver_->OnListenerUpdate(std::move(update_)); break; case kRdsUpdate: resolver_->OnRouteConfigUpdate( std::move(*update_.http_connection_manager.rds_update)); break; case kError: resolver_->OnError(error); break; case kDoesNotExist: resolver_->OnResourceDoesNotExist(); break; }; delete this; } // // XdsResolver::XdsConfigSelector::Route // bool MethodConfigsEqual(const ServiceConfig* sc1, const ServiceConfig* sc2) { if (sc1 == nullptr) return sc2 == nullptr; if (sc2 == nullptr) return false; return sc1->json_string() == sc2->json_string(); } bool XdsResolver::XdsConfigSelector::Route::ClusterWeightState::operator==( const ClusterWeightState& other) const { return range_end == other.range_end && cluster == other.cluster && MethodConfigsEqual(method_config.get(), other.method_config.get()); } bool XdsResolver::XdsConfigSelector::Route::operator==( const Route& other) const { return route == other.route && weighted_cluster_state == other.weighted_cluster_state && MethodConfigsEqual(method_config.get(), other.method_config.get()); } // // XdsResolver::XdsConfigSelector // XdsResolver::XdsConfigSelector::XdsConfigSelector( RefCountedPtr resolver, grpc_error_handle* error) : resolver_(std::move(resolver)) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] creating XdsConfigSelector %p", resolver_.get(), this); } // 1. Construct the route table // 2 Update resolver's cluster state map // 3. Construct cluster list to hold on to entries in the cluster state // map. // Reserve the necessary entries up-front to avoid reallocation as we add // elements. This is necessary because the string_view in the entry's // weighted_cluster_state field points to the memory in the route field, so // moving the entry in a reallocation will cause the string_view to point to // invalid data. route_table_.reserve(resolver_->current_virtual_host_.routes.size()); for (auto& route : resolver_->current_virtual_host_.routes) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] XdsConfigSelector %p: route: %s", resolver_.get(), this, route.ToString().c_str()); } route_table_.emplace_back(); auto& route_entry = route_table_.back(); route_entry.route = route; // If the route doesn't specify a timeout, set its timeout to the global // one. if (!route.max_stream_duration.has_value()) { route_entry.route.max_stream_duration = resolver_->current_listener_.http_connection_manager .http_max_stream_duration; } if (route.weighted_clusters.empty()) { *error = CreateMethodConfig(route_entry.route, nullptr, &route_entry.method_config); MaybeAddCluster(route.cluster_name); } else { uint32_t end = 0; for (const auto& weighted_cluster : route_entry.route.weighted_clusters) { Route::ClusterWeightState cluster_weight_state; *error = CreateMethodConfig(route_entry.route, &weighted_cluster, &cluster_weight_state.method_config); if (*error != GRPC_ERROR_NONE) return; end += weighted_cluster.weight; cluster_weight_state.range_end = end; cluster_weight_state.cluster = weighted_cluster.name; route_entry.weighted_cluster_state.push_back( std::move(cluster_weight_state)); MaybeAddCluster(weighted_cluster.name); } } } // Populate filter list. for (const auto& http_filter : resolver_->current_listener_.http_connection_manager.http_filters) { // Find filter. This is guaranteed to succeed, because it's checked // at config validation time in the XdsApi code. const XdsHttpFilterImpl* filter_impl = XdsHttpFilterRegistry::GetFilterForType( http_filter.config.config_proto_type_name); GPR_ASSERT(filter_impl != nullptr); // Add C-core filter to list. if (filter_impl->channel_filter() != nullptr) { filters_.push_back(filter_impl->channel_filter()); } } } XdsResolver::XdsConfigSelector::~XdsConfigSelector() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] destroying XdsConfigSelector %p", resolver_.get(), this); } clusters_.clear(); resolver_->MaybeRemoveUnusedClusters(); } const XdsHttpFilterImpl::FilterConfig* FindFilterConfigOverride( const std::string& instance_name, const XdsApi::RdsUpdate::VirtualHost& vhost, const XdsApi::Route& route, const XdsApi::Route::ClusterWeight* cluster_weight) { // Check ClusterWeight, if any. if (cluster_weight != nullptr) { auto it = cluster_weight->typed_per_filter_config.find(instance_name); if (it != cluster_weight->typed_per_filter_config.end()) return &it->second; } // Check Route. auto it = route.typed_per_filter_config.find(instance_name); if (it != route.typed_per_filter_config.end()) return &it->second; // Check VirtualHost. it = vhost.typed_per_filter_config.find(instance_name); if (it != vhost.typed_per_filter_config.end()) return &it->second; // Not found. return nullptr; } grpc_error_handle XdsResolver::XdsConfigSelector::CreateMethodConfig( const XdsApi::Route& route, const XdsApi::Route::ClusterWeight* cluster_weight, RefCountedPtr* method_config) { std::vector fields; // Set retry policy if any. if (route.retry_policy.has_value()) { std::vector retry_parts; retry_parts.push_back(absl::StrFormat( "\"retryPolicy\": {\n" " \"maxAttempts\": %d,\n" " \"initialBackoff\": \"%d.%09ds\",\n" " \"maxBackoff\": \"%d.%09ds\",\n" " \"backoffMultiplier\": 2,\n", route.retry_policy->num_retries + 1, route.retry_policy->retry_back_off.base_interval.seconds, route.retry_policy->retry_back_off.base_interval.nanos, route.retry_policy->retry_back_off.max_interval.seconds, route.retry_policy->retry_back_off.max_interval.nanos)); std::vector code_parts; if (route.retry_policy->retry_on.Contains(GRPC_STATUS_CANCELLED)) { code_parts.push_back(" \"CANCELLED\""); } if (route.retry_policy->retry_on.Contains(GRPC_STATUS_DEADLINE_EXCEEDED)) { code_parts.push_back(" \"DEADLINE_EXCEEDED\""); } if (route.retry_policy->retry_on.Contains(GRPC_STATUS_INTERNAL)) { code_parts.push_back(" \"INTERNAL\""); } if (route.retry_policy->retry_on.Contains(GRPC_STATUS_RESOURCE_EXHAUSTED)) { code_parts.push_back(" \"RESOURCE_EXHAUSTED\""); } if (route.retry_policy->retry_on.Contains(GRPC_STATUS_UNAVAILABLE)) { code_parts.push_back(" \"UNAVAILABLE\""); } retry_parts.push_back( absl::StrFormat(" \"retryableStatusCodes\": [\n %s ]\n", absl::StrJoin(code_parts, ",\n"))); retry_parts.push_back(absl::StrFormat(" }")); fields.emplace_back(absl::StrJoin(retry_parts, "")); } // Set timeout. if (route.max_stream_duration.has_value() && (route.max_stream_duration->seconds != 0 || route.max_stream_duration->nanos != 0)) { fields.emplace_back(absl::StrFormat(" \"timeout\": \"%d.%09ds\"", route.max_stream_duration->seconds, route.max_stream_duration->nanos)); } // Handle xDS HTTP filters. std::map> per_filter_configs; grpc_channel_args* args = grpc_channel_args_copy(resolver_->args_); for (const auto& http_filter : resolver_->current_listener_.http_connection_manager.http_filters) { // Find filter. This is guaranteed to succeed, because it's checked // at config validation time in the XdsApi code. const XdsHttpFilterImpl* filter_impl = XdsHttpFilterRegistry::GetFilterForType( http_filter.config.config_proto_type_name); GPR_ASSERT(filter_impl != nullptr); // If there is not actually any C-core filter associated with this // xDS filter, then it won't need any config, so skip it. if (filter_impl->channel_filter() == nullptr) continue; // Allow filter to add channel args that may affect service config // parsing. args = filter_impl->ModifyChannelArgs(args); // Find config override, if any. const XdsHttpFilterImpl::FilterConfig* config_override = FindFilterConfigOverride(http_filter.name, resolver_->current_virtual_host_, route, cluster_weight); // Generate service config for filter. auto method_config_field = filter_impl->GenerateServiceConfig(http_filter.config, config_override); if (!method_config_field.ok()) { return GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("failed to generate method config for HTTP filter ", http_filter.name, ": ", method_config_field.status().ToString()) .c_str()); } per_filter_configs[method_config_field->service_config_field_name] .push_back(method_config_field->element); } for (const auto& p : per_filter_configs) { fields.emplace_back(absl::StrCat(" \"", p.first, "\": [\n", absl::StrJoin(p.second, ",\n"), "\n ]")); } // Construct service config. grpc_error_handle error = GRPC_ERROR_NONE; if (!fields.empty()) { std::string json = absl::StrCat( "{\n" " \"methodConfig\": [ {\n" " \"name\": [\n" " {}\n" " ],\n" " ", absl::StrJoin(fields, ",\n"), "\n } ]\n" "}"); *method_config = ServiceConfig::Create(args, json.c_str(), &error); } grpc_channel_args_destroy(args); return error; } grpc_channel_args* XdsResolver::XdsConfigSelector::ModifyChannelArgs( grpc_channel_args* args) { return args; } void XdsResolver::XdsConfigSelector::MaybeAddCluster(const std::string& name) { if (clusters_.find(name) == clusters_.end()) { auto it = resolver_->cluster_state_map_.find(name); if (it == resolver_->cluster_state_map_.end()) { auto new_cluster_state = MakeRefCounted(resolver_, name); clusters_[new_cluster_state->cluster()] = std::move(new_cluster_state); } else { clusters_[it->second->cluster()] = it->second->Ref(); } } } absl::optional GetHeaderValue( grpc_metadata_batch* initial_metadata, absl::string_view header_name, std::string* concatenated_value) { // Note: If we ever allow binary headers here, we still need to // special-case ignore "grpc-tags-bin" and "grpc-trace-bin", since // they are not visible to the LB policy in grpc-go. if (absl::EndsWith(header_name, "-bin")) { return absl::nullopt; } else if (header_name == "content-type") { return "application/grpc"; } return grpc_metadata_batch_get_value(initial_metadata, header_name, concatenated_value); } bool HeadersMatch(const std::vector& header_matchers, grpc_metadata_batch* initial_metadata) { for (const auto& header_matcher : header_matchers) { std::string concatenated_value; if (!header_matcher.Match(GetHeaderValue( initial_metadata, header_matcher.name(), &concatenated_value))) { return false; } } return true; } absl::optional HeaderHashHelper( const XdsApi::Route::HashPolicy& policy, grpc_metadata_batch* initial_metadata) { GPR_ASSERT(policy.type == XdsApi::Route::HashPolicy::HEADER); std::string value_buffer; absl::optional header_value = GetHeaderValue(initial_metadata, policy.header_name, &value_buffer); if (!header_value.has_value()) { return absl::nullopt; } if (policy.regex != nullptr) { // If GetHeaderValue() did not already store the value in // value_buffer, copy it there now, so we can modify it. if (header_value->data() != value_buffer.data()) { value_buffer = std::string(*header_value); } RE2::GlobalReplace(&value_buffer, *policy.regex, policy.regex_substitution); header_value = value_buffer; } return XXH64(header_value->data(), header_value->size(), 0); } bool UnderFraction(const uint32_t fraction_per_million) { // Generate a random number in [0, 1000000). const uint32_t random_number = rand() % 1000000; return random_number < fraction_per_million; } ConfigSelector::CallConfig XdsResolver::XdsConfigSelector::GetCallConfig( GetCallConfigArgs args) { for (const auto& entry : route_table_) { // Path matching. if (!entry.route.matchers.path_matcher.Match( StringViewFromSlice(*args.path))) { continue; } // Header Matching. if (!HeadersMatch(entry.route.matchers.header_matchers, args.initial_metadata)) { continue; } // Match fraction check if (entry.route.matchers.fraction_per_million.has_value() && !UnderFraction(entry.route.matchers.fraction_per_million.value())) { continue; } // Found a route match absl::string_view cluster_name; RefCountedPtr method_config; if (entry.route.weighted_clusters.empty()) { cluster_name = entry.route.cluster_name; method_config = entry.method_config; } else { const uint32_t key = rand() % entry.weighted_cluster_state[entry.weighted_cluster_state.size() - 1] .range_end; // Find the index in weighted clusters corresponding to key. size_t mid = 0; size_t start_index = 0; size_t end_index = entry.weighted_cluster_state.size() - 1; size_t index = 0; while (end_index > start_index) { mid = (start_index + end_index) / 2; if (entry.weighted_cluster_state[mid].range_end > key) { end_index = mid; } else if (entry.weighted_cluster_state[mid].range_end < key) { start_index = mid + 1; } else { index = mid + 1; break; } } if (index == 0) index = start_index; GPR_ASSERT(entry.weighted_cluster_state[index].range_end > key); cluster_name = entry.weighted_cluster_state[index].cluster; method_config = entry.weighted_cluster_state[index].method_config; } auto it = clusters_.find(cluster_name); GPR_ASSERT(it != clusters_.end()); // Generate a hash. absl::optional hash; for (const auto& hash_policy : entry.route.hash_policies) { absl::optional new_hash; switch (hash_policy.type) { case XdsApi::Route::HashPolicy::HEADER: new_hash = HeaderHashHelper(hash_policy, args.initial_metadata); break; case XdsApi::Route::HashPolicy::CHANNEL_ID: new_hash = static_cast( reinterpret_cast(resolver_.get())); break; default: GPR_ASSERT(0); } if (new_hash.has_value()) { // Rotating the old value prevents duplicate hash rules from cancelling // each other out and preserves all of the entropy const uint64_t old_value = hash.has_value() ? ((hash.value() << 1) | (hash.value() >> 63)) : 0; hash = old_value ^ new_hash.value(); } // If the policy is a terminal policy and a hash has been generated, // ignore the rest of the hash policies. if (hash_policy.terminal && hash.has_value()) { break; } } if (!hash.has_value()) { // If there is no hash, we just choose a random value as a default. // We cannot directly use the result of rand() as the hash value, // since it is a 32-bit number and not a 64-bit number and will // therefore not be evenly distributed. uint32_t upper = rand(); uint32_t lower = rand(); hash = (static_cast(upper) << 32) | lower; } CallConfig call_config; if (method_config != nullptr) { call_config.method_configs = method_config->GetMethodParsedConfigVector(grpc_empty_slice()); call_config.service_config = std::move(method_config); } call_config.call_attributes[kXdsClusterAttribute] = it->first; std::string hash_string = absl::StrCat(hash.value()); char* hash_value = static_cast(args.arena->Alloc(hash_string.size() + 1)); memcpy(hash_value, hash_string.c_str(), hash_string.size()); hash_value[hash_string.size()] = '\0'; call_config.call_attributes[kRequestRingHashAttribute] = hash_value; call_config.call_dispatch_controller = args.arena->New(it->second->Ref()); return call_config; } return CallConfig(); } // // XdsResolver // void XdsResolver::StartLocked() { grpc_error_handle error = GRPC_ERROR_NONE; xds_client_ = XdsClient::GetOrCreate(args_, &error); if (error != GRPC_ERROR_NONE) { gpr_log(GPR_ERROR, "Failed to create xds client -- channel will remain in " "TRANSIENT_FAILURE: %s", grpc_error_std_string(error).c_str()); result_handler_->ReturnError(error); return; } grpc_pollset_set_add_pollset_set(xds_client_->interested_parties(), interested_parties_); channelz::ChannelNode* parent_channelz_node = grpc_channel_args_find_pointer( args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE); if (parent_channelz_node != nullptr) { xds_client_->AddChannelzLinkage(parent_channelz_node); } auto watcher = absl::make_unique(Ref()); listener_watcher_ = watcher.get(); xds_client_->WatchListenerData(server_name_, std::move(watcher)); } void XdsResolver::ShutdownLocked() { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] shutting down", this); } if (xds_client_ != nullptr) { if (listener_watcher_ != nullptr) { xds_client_->CancelListenerDataWatch(server_name_, listener_watcher_, /*delay_unsubscription=*/false); } if (route_config_watcher_ != nullptr) { xds_client_->CancelRouteConfigDataWatch( server_name_, route_config_watcher_, /*delay_unsubscription=*/false); } channelz::ChannelNode* parent_channelz_node = grpc_channel_args_find_pointer( args_, GRPC_ARG_CHANNELZ_CHANNEL_NODE); if (parent_channelz_node != nullptr) { xds_client_->RemoveChannelzLinkage(parent_channelz_node); } grpc_pollset_set_del_pollset_set(xds_client_->interested_parties(), interested_parties_); xds_client_.reset(); } } void XdsResolver::OnListenerUpdate(XdsApi::LdsUpdate listener) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] received updated listener data", this); } if (listener.http_connection_manager.route_config_name != route_config_name_) { if (route_config_watcher_ != nullptr) { xds_client_->CancelRouteConfigDataWatch( route_config_name_, route_config_watcher_, /*delay_unsubscription=*/ !listener.http_connection_manager.route_config_name.empty()); route_config_watcher_ = nullptr; } route_config_name_ = std::move(listener.http_connection_manager.route_config_name); if (!route_config_name_.empty()) { current_virtual_host_.routes.clear(); auto watcher = absl::make_unique(Ref()); route_config_watcher_ = watcher.get(); xds_client_->WatchRouteConfigData(route_config_name_, std::move(watcher)); } } current_listener_ = std::move(listener); if (route_config_name_.empty()) { GPR_ASSERT( current_listener_.http_connection_manager.rds_update.has_value()); OnRouteConfigUpdate( std::move(*current_listener_.http_connection_manager.rds_update)); } else { // HCM may contain newer filter config. We need to propagate the update as // config selector to the channel GenerateResult(); } } void XdsResolver::OnRouteConfigUpdate(XdsApi::RdsUpdate rds_update) { if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] received updated route config", this); } // Find the relevant VirtualHost from the RouteConfiguration. XdsApi::RdsUpdate::VirtualHost* vhost = rds_update.FindVirtualHostForDomain(server_name_); if (vhost == nullptr) { OnError(GRPC_ERROR_CREATE_FROM_COPIED_STRING( absl::StrCat("could not find VirtualHost for ", server_name_, " in RouteConfiguration") .c_str())); return; } // Save the virtual host in the resolver. current_virtual_host_ = std::move(*vhost); // Send a new result to the channel. GenerateResult(); } void XdsResolver::OnError(grpc_error_handle error) { gpr_log(GPR_ERROR, "[xds_resolver %p] received error from XdsClient: %s", this, grpc_error_std_string(error).c_str()); Result result; grpc_arg new_arg = xds_client_->MakeChannelArg(); result.args = grpc_channel_args_copy_and_add(args_, &new_arg, 1); result.service_config_error = error; result_handler_->ReturnResult(std::move(result)); } void XdsResolver::OnResourceDoesNotExist() { gpr_log(GPR_ERROR, "[xds_resolver %p] LDS/RDS resource does not exist -- clearing " "update and returning empty service config", this); current_virtual_host_.routes.clear(); Result result; result.service_config = ServiceConfig::Create(args_, "{}", &result.service_config_error); GPR_ASSERT(result.service_config != nullptr); result.args = grpc_channel_args_copy(args_); result_handler_->ReturnResult(std::move(result)); } grpc_error_handle XdsResolver::CreateServiceConfig( RefCountedPtr* service_config) { std::vector clusters; for (const auto& cluster : cluster_state_map_) { clusters.push_back( absl::StrFormat(" \"%s\":{\n" " \"childPolicy\":[ {\n" " \"cds_experimental\":{\n" " \"cluster\": \"%s\"\n" " }\n" " } ]\n" " }", cluster.first, cluster.first)); } std::vector config_parts; config_parts.push_back( "{\n" " \"loadBalancingConfig\":[\n" " { \"xds_cluster_manager_experimental\":{\n" " \"children\":{\n"); config_parts.push_back(absl::StrJoin(clusters, ",\n")); config_parts.push_back( " }\n" " } }\n" " ]\n" "}"); std::string json = absl::StrJoin(config_parts, ""); grpc_error_handle error = GRPC_ERROR_NONE; *service_config = ServiceConfig::Create(args_, json.c_str(), &error); return error; } void XdsResolver::GenerateResult() { if (current_virtual_host_.routes.empty()) return; // First create XdsConfigSelector, which may add new entries to the cluster // state map, and then CreateServiceConfig for LB policies. grpc_error_handle error = GRPC_ERROR_NONE; auto config_selector = MakeRefCounted(Ref(), &error); if (error != GRPC_ERROR_NONE) { OnError(error); return; } Result result; error = CreateServiceConfig(&result.service_config); if (error != GRPC_ERROR_NONE) { OnError(error); return; } if (GRPC_TRACE_FLAG_ENABLED(grpc_xds_resolver_trace)) { gpr_log(GPR_INFO, "[xds_resolver %p] generated service config: %s", this, result.service_config->json_string().c_str()); } grpc_arg new_args[] = { xds_client_->MakeChannelArg(), config_selector->MakeChannelArg(), }; result.args = grpc_channel_args_copy_and_add(args_, new_args, GPR_ARRAY_SIZE(new_args)); result_handler_->ReturnResult(std::move(result)); } void XdsResolver::MaybeRemoveUnusedClusters() { bool update_needed = false; for (auto it = cluster_state_map_.begin(); it != cluster_state_map_.end();) { RefCountedPtr cluster_state = it->second->RefIfNonZero(); if (cluster_state != nullptr) { ++it; } else { update_needed = true; it = cluster_state_map_.erase(it); } } if (update_needed && xds_client_ != nullptr) { // Send a new result to the channel. GenerateResult(); } } // // Factory // class XdsResolverFactory : public ResolverFactory { public: bool IsValidUri(const URI& uri) const override { if (GPR_UNLIKELY(!uri.authority().empty())) { gpr_log(GPR_ERROR, "URI authority not supported"); return false; } return true; } OrphanablePtr CreateResolver(ResolverArgs args) const override { if (!IsValidUri(args.uri)) return nullptr; return MakeOrphanable(std::move(args)); } const char* scheme() const override { return "xds"; } }; } // namespace } // namespace grpc_core void grpc_resolver_xds_init() { grpc_core::ResolverRegistry::Builder::RegisterResolverFactory( absl::make_unique()); } void grpc_resolver_xds_shutdown() {}