/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/lib/iomgr/exec_ctx.h" #include #include #include "src/core/lib/iomgr/combiner.h" #include "src/core/lib/iomgr/error.h" static void exec_ctx_run(grpc_closure* closure) { #ifndef NDEBUG closure->scheduled = false; if (grpc_trace_closure.enabled()) { gpr_log(GPR_DEBUG, "running closure %p: created [%s:%d]: %s [%s:%d]", closure, closure->file_created, closure->line_created, closure->run ? "run" : "scheduled", closure->file_initiated, closure->line_initiated); } #endif grpc_error_handle error = grpc_core::internal::StatusMoveFromHeapPtr(closure->error_data.error); closure->error_data.error = 0; closure->cb(closure->cb_arg, std::move(error)); #ifndef NDEBUG if (grpc_trace_closure.enabled()) { gpr_log(GPR_DEBUG, "closure %p finished", closure); } #endif } static void exec_ctx_sched(grpc_closure* closure) { grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(), closure); } namespace grpc_core { GPR_THREAD_LOCAL(ExecCtx*) ExecCtx::exec_ctx_; GPR_THREAD_LOCAL(ApplicationCallbackExecCtx*) ApplicationCallbackExecCtx::callback_exec_ctx_; bool ExecCtx::Flush() { bool did_something = false; for (;;) { if (!grpc_closure_list_empty(closure_list_)) { grpc_closure* c = closure_list_.head; closure_list_.head = closure_list_.tail = nullptr; while (c != nullptr) { grpc_closure* next = c->next_data.next; did_something = true; exec_ctx_run(c); c = next; } } else if (!grpc_combiner_continue_exec_ctx()) { break; } } GPR_ASSERT(combiner_data_.active_combiner == nullptr); return did_something; } void ExecCtx::Run(const DebugLocation& location, grpc_closure* closure, grpc_error_handle error) { (void)location; if (closure == nullptr) { GRPC_ERROR_UNREF(error); return; } #ifndef NDEBUG if (closure->scheduled) { gpr_log(GPR_ERROR, "Closure already scheduled. (closure: %p, created: [%s:%d], " "previously scheduled at: [%s: %d], newly scheduled at [%s: %d]", closure, closure->file_created, closure->line_created, closure->file_initiated, closure->line_initiated, location.file(), location.line()); abort(); } closure->scheduled = true; closure->file_initiated = location.file(); closure->line_initiated = location.line(); closure->run = false; GPR_ASSERT(closure->cb != nullptr); #endif closure->error_data.error = internal::StatusAllocHeapPtr(error); exec_ctx_sched(closure); } void ExecCtx::RunList(const DebugLocation& location, grpc_closure_list* list) { (void)location; grpc_closure* c = list->head; while (c != nullptr) { grpc_closure* next = c->next_data.next; #ifndef NDEBUG if (c->scheduled) { gpr_log(GPR_ERROR, "Closure already scheduled. (closure: %p, created: [%s:%d], " "previously scheduled at: [%s: %d], newly scheduled at [%s:%d]", c, c->file_created, c->line_created, c->file_initiated, c->line_initiated, location.file(), location.line()); abort(); } c->scheduled = true; c->file_initiated = location.file(); c->line_initiated = location.line(); c->run = false; GPR_ASSERT(c->cb != nullptr); #endif exec_ctx_sched(c); c = next; } list->head = list->tail = nullptr; } } // namespace grpc_core