/* * * Copyright 2015 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #ifndef GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #define GRPC_CORE_LIB_IOMGR_EXEC_CTX_H #include #include #include #include #include #include "src/core/lib/gpr/time_precise.h" #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/fork.h" #include "src/core/lib/iomgr/closure.h" typedef int64_t grpc_millis; #define GRPC_MILLIS_INF_FUTURE INT64_MAX #define GRPC_MILLIS_INF_PAST INT64_MIN /** A combiner represents a list of work to be executed later. Forward declared here to avoid a circular dependency with combiner.h. */ typedef struct grpc_combiner grpc_combiner; /* This exec_ctx is ready to return: either pre-populated, or cached as soon as the finish_check returns true */ #define GRPC_EXEC_CTX_FLAG_IS_FINISHED 1 /* The exec_ctx's thread is (potentially) owned by a call or channel: care should be given to not delete said call/channel from this exec_ctx */ #define GRPC_EXEC_CTX_FLAG_THREAD_RESOURCE_LOOP 2 /* This exec ctx was initialized by an internal thread, and should not be counted by fork handlers */ #define GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 4 /* This application callback exec ctx was initialized by an internal thread, and should not be counted by fork handlers */ #define GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD 1 extern grpc_closure_scheduler* grpc_schedule_on_exec_ctx; gpr_timespec grpc_millis_to_timespec(grpc_millis millis, gpr_clock_type clock); grpc_millis grpc_timespec_to_millis_round_down(gpr_timespec timespec); grpc_millis grpc_timespec_to_millis_round_up(gpr_timespec timespec); grpc_millis grpc_cycle_counter_to_millis_round_down(gpr_cycle_counter cycles); grpc_millis grpc_cycle_counter_to_millis_round_up(gpr_cycle_counter cycles); namespace grpc_core { /** Execution context. * A bag of data that collects information along a callstack. * It is created on the stack at core entry points (public API or iomgr), and * stored internally as a thread-local variable. * * Generally, to create an exec_ctx instance, add the following line at the top * of the public API entry point or at the start of a thread's work function : * * grpc_core::ExecCtx exec_ctx; * * Access the created ExecCtx instance using : * grpc_core::ExecCtx::Get() * * Specific responsibilities (this may grow in the future): * - track a list of core work that needs to be delayed until the base of the * call stack (this provides a convenient mechanism to run callbacks * without worrying about locking issues) * - provide a decision maker (via IsReadyToFinish) that provides a * signal as to whether a borrowed thread should continue to do work or * should actively try to finish up and get this thread back to its owner * * CONVENTIONS: * - Instance of this must ALWAYS be constructed on the stack, never * heap allocated. * - Do not pass exec_ctx as a parameter to a function. Always access it using * grpc_core::ExecCtx::Get(). * - NOTE: In the future, the convention is likely to change to allow only one * ExecCtx on a thread's stack at the same time. The TODO below * discusses this plan in more detail. * * TODO(yashykt): Only allow one "active" ExecCtx on a thread at the same time. * Stage 1: If a new one is created on the stack, it should just * pass-through to the underlying ExecCtx deeper in the thread's * stack. * Stage 2: Assert if a 2nd one is ever created on the stack * since that implies a core re-entry outside of application * callbacks. */ class ExecCtx { public: /** Default Constructor */ ExecCtx() : flags_(GRPC_EXEC_CTX_FLAG_IS_FINISHED) { grpc_core::Fork::IncExecCtxCount(); Set(this); } /** Parameterised Constructor */ ExecCtx(uintptr_t fl) : flags_(fl) { if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { grpc_core::Fork::IncExecCtxCount(); } Set(this); } /** Destructor */ virtual ~ExecCtx() { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; Flush(); Set(last_exec_ctx_); if (!(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { grpc_core::Fork::DecExecCtxCount(); } } /** Disallow copy and assignment operators */ ExecCtx(const ExecCtx&) = delete; ExecCtx& operator=(const ExecCtx&) = delete; unsigned starting_cpu() const { return starting_cpu_; } struct CombinerData { /* currently active combiner: updated only via combiner.c */ grpc_combiner* active_combiner; /* last active combiner in the active combiner list */ grpc_combiner* last_combiner; }; /** Only to be used by grpc-combiner code */ CombinerData* combiner_data() { return &combiner_data_; } /** Return pointer to grpc_closure_list */ grpc_closure_list* closure_list() { return &closure_list_; } /** Return flags */ uintptr_t flags() { return flags_; } /** Checks if there is work to be done */ bool HasWork() { return combiner_data_.active_combiner != nullptr || !grpc_closure_list_empty(closure_list_); } /** Flush any work that has been enqueued onto this grpc_exec_ctx. * Caller must guarantee that no interfering locks are held. * Returns true if work was performed, false otherwise. */ bool Flush(); /** Returns true if we'd like to leave this execution context as soon as * possible: useful for deciding whether to do something more or not * depending on outside context. */ bool IsReadyToFinish() { if ((flags_ & GRPC_EXEC_CTX_FLAG_IS_FINISHED) == 0) { if (CheckReadyToFinish()) { flags_ |= GRPC_EXEC_CTX_FLAG_IS_FINISHED; return true; } return false; } else { return true; } } /** Returns the stored current time relative to start if valid, * otherwise refreshes the stored time, sets it valid and returns the new * value. */ grpc_millis Now(); /** Invalidates the stored time value. A new time value will be set on calling * Now(). */ void InvalidateNow() { now_is_valid_ = false; } /** To be used only by shutdown code in iomgr */ void SetNowIomgrShutdown() { now_ = GRPC_MILLIS_INF_FUTURE; now_is_valid_ = true; } /** To be used only for testing. * Sets the now value. */ void TestOnlySetNow(grpc_millis new_val) { now_ = new_val; now_is_valid_ = true; } static void TestOnlyGlobalInit(gpr_timespec new_val); /** Global initialization for ExecCtx. Called by iomgr. */ static void GlobalInit(void); /** Global shutdown for ExecCtx. Called by iomgr. */ static void GlobalShutdown(void) { gpr_tls_destroy(&exec_ctx_); } /** Gets pointer to current exec_ctx. */ static ExecCtx* Get() { return reinterpret_cast(gpr_tls_get(&exec_ctx_)); } static void Set(ExecCtx* exec_ctx) { gpr_tls_set(&exec_ctx_, reinterpret_cast(exec_ctx)); } protected: /** Check if ready to finish. */ virtual bool CheckReadyToFinish() { return false; } /** Disallow delete on ExecCtx. */ static void operator delete(void* p) { abort(); } private: /** Set exec_ctx_ to exec_ctx. */ grpc_closure_list closure_list_ = GRPC_CLOSURE_LIST_INIT; CombinerData combiner_data_ = {nullptr, nullptr}; uintptr_t flags_; unsigned starting_cpu_ = gpr_cpu_current_cpu(); bool now_is_valid_ = false; grpc_millis now_ = 0; GPR_TLS_CLASS_DECL(exec_ctx_); ExecCtx* last_exec_ctx_ = Get(); }; /** Application-callback execution context. * A bag of data that collects information along a callstack. * It is created on the stack at core entry points, and stored internally * as a thread-local variable. * * There are three key differences between this structure and ExecCtx: * 1. ApplicationCallbackExecCtx builds a list of application-level * callbacks, but ExecCtx builds a list of internal callbacks to invoke. * 2. ApplicationCallbackExecCtx invokes its callbacks only at destruction; * there is no explicit Flush method. * 3. If more than one ApplicationCallbackExecCtx is created on the thread's * stack, only the one closest to the base of the stack is actually * active and this is the only one that enqueues application callbacks. * (Unlike ExecCtx, it is not feasible to prevent multiple of these on the * stack since the executing application callback may itself enter core. * However, the new one created will just pass callbacks through to the * base one and those will not be executed until the return to the * destructor of the base one, preventing unlimited stack growth.) * * This structure exists because application callbacks may themselves cause a * core re-entry (e.g., through a public API call) and if that call in turn * causes another application-callback, there could be arbitrarily growing * stacks of core re-entries. Instead, any application callbacks instead should * not be invoked until other core work is done and other application callbacks * have completed. To accomplish this, any application callback should be * enqueued using grpc_core::ApplicationCallbackExecCtx::Enqueue . * * CONVENTIONS: * - Instances of this must ALWAYS be constructed on the stack, never * heap allocated. * - Instances of this are generally constructed before ExecCtx when needed. * The only exception is for ExecCtx's that are explicitly flushed and * that survive beyond the scope of the function that can cause application * callbacks to be invoked (e.g., in the timer thread). * * Generally, core entry points that may trigger application-level callbacks * will have the following declarations: * * grpc_core::ApplicationCallbackExecCtx callback_exec_ctx; * grpc_core::ExecCtx exec_ctx; * * This ordering is important to make sure that the ApplicationCallbackExecCtx * is destroyed after the ExecCtx (to prevent the re-entry problem described * above, as well as making sure that ExecCtx core callbacks are invoked first) * */ class ApplicationCallbackExecCtx { public: /** Default Constructor */ ApplicationCallbackExecCtx() { Set(this, flags_); } /** Parameterised Constructor */ ApplicationCallbackExecCtx(uintptr_t fl) : flags_(fl) { Set(this, flags_); } ~ApplicationCallbackExecCtx() { if (reinterpret_cast( gpr_tls_get(&callback_exec_ctx_)) == this) { while (head_ != nullptr) { auto* f = head_; head_ = f->internal_next; if (f->internal_next == nullptr) { tail_ = nullptr; } (*f->functor_run)(f, f->internal_success); } gpr_tls_set(&callback_exec_ctx_, reinterpret_cast(nullptr)); if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags_)) { grpc_core::Fork::DecExecCtxCount(); } } else { GPR_DEBUG_ASSERT(head_ == nullptr); GPR_DEBUG_ASSERT(tail_ == nullptr); } } static void Set(ApplicationCallbackExecCtx* exec_ctx, uintptr_t flags) { if (reinterpret_cast( gpr_tls_get(&callback_exec_ctx_)) == nullptr) { if (!(GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD & flags)) { grpc_core::Fork::IncExecCtxCount(); } gpr_tls_set(&callback_exec_ctx_, reinterpret_cast(exec_ctx)); } } static void Enqueue(grpc_experimental_completion_queue_functor* functor, int is_success) { functor->internal_success = is_success; functor->internal_next = nullptr; auto* ctx = reinterpret_cast( gpr_tls_get(&callback_exec_ctx_)); if (ctx->head_ == nullptr) { ctx->head_ = functor; } if (ctx->tail_ != nullptr) { ctx->tail_->internal_next = functor; } ctx->tail_ = functor; } /** Global initialization for ApplicationCallbackExecCtx. Called by init. */ static void GlobalInit(void) { gpr_tls_init(&callback_exec_ctx_); } /** Global shutdown for ApplicationCallbackExecCtx. Called by init. */ static void GlobalShutdown(void) { gpr_tls_destroy(&callback_exec_ctx_); } private: uintptr_t flags_{0u}; grpc_experimental_completion_queue_functor* head_{nullptr}; grpc_experimental_completion_queue_functor* tail_{nullptr}; GPR_TLS_CLASS_DECL(callback_exec_ctx_); }; } // namespace grpc_core #endif /* GRPC_CORE_LIB_IOMGR_EXEC_CTX_H */