// // // Copyright 2017 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // #include "src/core/lib/iomgr/call_combiner.h" #include #include "absl/log/check.h" #include "absl/log/log.h" #include #include "src/core/lib/gprpp/crash.h" #include "src/core/telemetry/stats.h" #include "src/core/telemetry/stats_data.h" namespace grpc_core { namespace { // grpc_error LSB can be used constexpr intptr_t kErrorBit = 1; grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) { if (cancel_state & kErrorBit) { return internal::StatusGetFromHeapPtr(cancel_state & ~kErrorBit); } return absl::OkStatus(); } } // namespace CallCombiner::CallCombiner() { gpr_atm_no_barrier_store(&cancel_state_, 0); gpr_atm_no_barrier_store(&size_, 0); #ifdef GRPC_TSAN_ENABLED GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this, grpc_schedule_on_exec_ctx); #endif } CallCombiner::~CallCombiner() { if (cancel_state_ & kErrorBit) { internal::StatusFreeHeapPtr(cancel_state_ & ~kErrorBit); } } #ifdef GRPC_TSAN_ENABLED void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) { CallCombiner* self = static_cast(arg); // We ref-count the lock, and check if it's already taken. // If it was taken, we should do nothing. Otherwise, we will mark it as // locked. Note that if two different threads try to do this, only one of // them will be able to mark the lock as acquired, while they both run their // callbacks. In such cases (which should never happen for call_combiner), // TSAN will correctly produce an error. // // TODO(soheil): This only covers the callbacks scheduled by // CallCombiner::Start() and CallCombiner::Stop(). // If in the future, a callback gets scheduled using other // mechanisms, we will need to add APIs to externally lock // call combiners. RefCountedPtr lock = self->tsan_lock_; bool prev = false; if (lock->taken.compare_exchange_strong(prev, true)) { TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true); } else { lock.reset(); } Closure::Run(DEBUG_LOCATION, self->original_closure_, error); if (lock != nullptr) { TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true); bool prev = true; CHECK(lock->taken.compare_exchange_strong(prev, false)); } } #endif void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error_handle error) { #ifdef GRPC_TSAN_ENABLED original_closure_ = closure; ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error); #else ExecCtx::Run(DEBUG_LOCATION, closure, error); #endif } #ifndef NDEBUG #define DEBUG_ARGS const char *file, int line, #define DEBUG_FMT_STR "%s:%d: " #define DEBUG_FMT_ARGS , file, line #else #define DEBUG_ARGS #define DEBUG_FMT_STR #define DEBUG_FMT_ARGS #endif void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error, DEBUG_ARGS const char* reason) { GRPC_TRACE_LOG(call_combiner, INFO) << absl::StrFormat( "==> CallCombiner::Start() [%p] closure=%s [" DEBUG_FMT_STR "%s] error=%s", this, closure->DebugString().c_str() DEBUG_FMT_ARGS, reason, StatusToString(error).c_str()); size_t prev_size = static_cast(gpr_atm_full_fetch_add(&size_, (gpr_atm)1)); GRPC_TRACE_LOG(call_combiner, INFO) << " size: " << prev_size << " -> " << prev_size + 1; if (prev_size == 0) { GRPC_TRACE_LOG(call_combiner, INFO) << " EXECUTING IMMEDIATELY"; // Queue was empty, so execute this closure immediately. ScheduleClosure(closure, error); } else { GRPC_TRACE_LOG(call_combiner, INFO) << " QUEUING"; // Queue was not empty, so add closure to queue. closure->error_data.error = internal::StatusAllocHeapPtr(error); queue_.Push( reinterpret_cast(closure)); } } void CallCombiner::Stop(DEBUG_ARGS const char* reason) { GRPC_TRACE_LOG(call_combiner, INFO) << absl::StrFormat("==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]", this DEBUG_FMT_ARGS, reason); size_t prev_size = static_cast(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1)); GRPC_TRACE_LOG(call_combiner, INFO) << " size: " << prev_size << " -> " << prev_size - 1; CHECK_GE(prev_size, 1u); if (prev_size > 1) { while (true) { GRPC_TRACE_LOG(call_combiner, INFO) << " checking queue"; bool empty; grpc_closure* closure = reinterpret_cast(queue_.PopAndCheckEnd(&empty)); if (closure == nullptr) { // This can happen either due to a race condition within the mpscq // code or because of a race with Start(). GRPC_TRACE_LOG(call_combiner, INFO) << " queue returned no result; checking again"; continue; } grpc_error_handle error = internal::StatusMoveFromHeapPtr(closure->error_data.error); closure->error_data.error = 0; GRPC_TRACE_LOG(call_combiner, INFO) << " EXECUTING FROM QUEUE: closure=" << closure->DebugString() << " error=" << StatusToString(error); ScheduleClosure(closure, error); break; } GRPC_TRACE_LOG(call_combiner, INFO) << " queue empty"; } } void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) { while (true) { // Decode original state. gpr_atm original_state = gpr_atm_acq_load(&cancel_state_); grpc_error_handle original_error = DecodeCancelStateError(original_state); // If error is set, invoke the cancellation closure immediately. // Otherwise, store the new closure. if (!original_error.ok()) { GRPC_TRACE_LOG(call_combiner, INFO) << "call_combiner=" << this << ": scheduling notify_on_cancel callback=" << closure << " for pre-existing cancellation"; ExecCtx::Run(DEBUG_LOCATION, closure, original_error); break; } else { if (gpr_atm_full_cas(&cancel_state_, original_state, reinterpret_cast(closure))) { GRPC_TRACE_LOG(call_combiner, INFO) << "call_combiner=" << this << ": setting notify_on_cancel=" << closure; // If we replaced an earlier closure, invoke the original // closure with absl::OkStatus(). This allows callers to clean // up any resources they may be holding for the callback. if (original_state != 0) { closure = reinterpret_cast(original_state); GRPC_TRACE_LOG(call_combiner, INFO) << "call_combiner=" << this << ": scheduling old cancel callback=" << closure; ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus()); } break; } } // cas failed, try again. } } void CallCombiner::Cancel(grpc_error_handle error) { intptr_t status_ptr = internal::StatusAllocHeapPtr(error); gpr_atm new_state = kErrorBit | status_ptr; while (true) { gpr_atm original_state = gpr_atm_acq_load(&cancel_state_); grpc_error_handle original_error = DecodeCancelStateError(original_state); if (!original_error.ok()) { internal::StatusFreeHeapPtr(status_ptr); break; } if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) { if (original_state != 0) { grpc_closure* notify_on_cancel = reinterpret_cast(original_state); GRPC_TRACE_LOG(call_combiner, INFO) << "call_combiner=" << this << ": scheduling notify_on_cancel callback=" << notify_on_cancel; ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, error); } break; } // cas failed, try again. } } } // namespace grpc_core