/* * * Copyright 2017 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/ext/filters/client_channel/retry_throttle.h" #include #include #include #include #include #include #include #include "src/core/lib/avl/avl.h" namespace grpc_core { namespace internal { // // ServerRetryThrottleData // ServerRetryThrottleData::ServerRetryThrottleData( intptr_t max_milli_tokens, intptr_t milli_token_ratio, ServerRetryThrottleData* old_throttle_data) : max_milli_tokens_(max_milli_tokens), milli_token_ratio_(milli_token_ratio) { intptr_t initial_milli_tokens = max_milli_tokens; // If there was a pre-existing entry for this server name, initialize // the token count by scaling proportionately to the old data. This // ensures that if we're already throttling retries on the old scale, // we will start out doing the same thing on the new one. if (old_throttle_data != nullptr) { double token_fraction = static_cast( gpr_atm_acq_load(&old_throttle_data->milli_tokens_)) / static_cast(old_throttle_data->max_milli_tokens_); initial_milli_tokens = static_cast(token_fraction * max_milli_tokens); } gpr_atm_rel_store(&milli_tokens_, static_cast(initial_milli_tokens)); // If there was a pre-existing entry, mark it as stale and give it a // pointer to the new entry, which is its replacement. if (old_throttle_data != nullptr) { Ref().release(); // Ref held by pre-existing entry. gpr_atm_rel_store(&old_throttle_data->replacement_, reinterpret_cast(this)); } } ServerRetryThrottleData::~ServerRetryThrottleData() { ServerRetryThrottleData* replacement = reinterpret_cast( gpr_atm_acq_load(&replacement_)); if (replacement != nullptr) { replacement->Unref(); } } void ServerRetryThrottleData::GetReplacementThrottleDataIfNeeded( ServerRetryThrottleData** throttle_data) { while (true) { ServerRetryThrottleData* new_throttle_data = reinterpret_cast( gpr_atm_acq_load(&(*throttle_data)->replacement_)); if (new_throttle_data == nullptr) return; *throttle_data = new_throttle_data; } } bool ServerRetryThrottleData::RecordFailure() { // First, check if we are stale and need to be replaced. ServerRetryThrottleData* throttle_data = this; GetReplacementThrottleDataIfNeeded(&throttle_data); // We decrement milli_tokens by 1000 (1 token) for each failure. const intptr_t new_value = static_cast(gpr_atm_no_barrier_clamped_add( &throttle_data->milli_tokens_, static_cast(-1000), static_cast(0), static_cast(throttle_data->max_milli_tokens_))); // Retries are allowed as long as the new value is above the threshold // (max_milli_tokens / 2). return new_value > throttle_data->max_milli_tokens_ / 2; } void ServerRetryThrottleData::RecordSuccess() { // First, check if we are stale and need to be replaced. ServerRetryThrottleData* throttle_data = this; GetReplacementThrottleDataIfNeeded(&throttle_data); // We increment milli_tokens by milli_token_ratio for each success. gpr_atm_no_barrier_clamped_add( &throttle_data->milli_tokens_, static_cast(throttle_data->milli_token_ratio_), static_cast(0), static_cast(throttle_data->max_milli_tokens_)); } // // avl vtable for string -> server_retry_throttle_data map // namespace { void* copy_server_name(void* key, void* /*unused*/) { return gpr_strdup(static_cast(key)); } long compare_server_name(void* key1, void* key2, void* /*unused*/) { return strcmp(static_cast(key1), static_cast(key2)); } void destroy_server_retry_throttle_data(void* value, void* /*unused*/) { ServerRetryThrottleData* throttle_data = static_cast(value); throttle_data->Unref(); } void* copy_server_retry_throttle_data(void* value, void* /*unused*/) { ServerRetryThrottleData* throttle_data = static_cast(value); return throttle_data->Ref().release(); } void destroy_server_name(void* key, void* /*unused*/) { gpr_free(key); } const grpc_avl_vtable avl_vtable = { destroy_server_name, copy_server_name, compare_server_name, destroy_server_retry_throttle_data, copy_server_retry_throttle_data}; } // namespace // // ServerRetryThrottleMap // static gpr_mu g_mu; static grpc_avl g_avl; void ServerRetryThrottleMap::Init() { gpr_mu_init(&g_mu); g_avl = grpc_avl_create(&avl_vtable); } void ServerRetryThrottleMap::Shutdown() { gpr_mu_destroy(&g_mu); grpc_avl_unref(g_avl, nullptr); } RefCountedPtr ServerRetryThrottleMap::GetDataForServer( const std::string& server_name, intptr_t max_milli_tokens, intptr_t milli_token_ratio) { RefCountedPtr result; gpr_mu_lock(&g_mu); ServerRetryThrottleData* throttle_data = static_cast( grpc_avl_get(g_avl, const_cast(server_name.c_str()), nullptr)); if (throttle_data == nullptr || throttle_data->max_milli_tokens() != max_milli_tokens || throttle_data->milli_token_ratio() != milli_token_ratio) { // Entry not found, or found with old parameters. Create a new one. result = MakeRefCounted( max_milli_tokens, milli_token_ratio, throttle_data); g_avl = grpc_avl_add(g_avl, gpr_strdup(server_name.c_str()), result->Ref().release(), nullptr); } else { // Entry found. Return a new ref to it. result = throttle_data->Ref(); } gpr_mu_unlock(&g_mu); return result; } } // namespace internal } // namespace grpc_core