// Copyright 2021 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef GRPC_CORE_LIB_PROMISE_ACTIVITY_H #define GRPC_CORE_LIB_PROMISE_ACTIVITY_H #include #include #include #include #include #include #include "absl/base/thread_annotations.h" #include "absl/status/status.h" #include "absl/types/optional.h" #include "absl/types/variant.h" #include #include "src/core/lib/gpr/tls.h" #include "src/core/lib/gprpp/construct_destruct.h" #include "src/core/lib/gprpp/no_destruct.h" #include "src/core/lib/gprpp/orphanable.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/context.h" #include "src/core/lib/promise/detail/promise_factory.h" #include "src/core/lib/promise/detail/status.h" #include "src/core/lib/promise/poll.h" namespace grpc_core { // A Wakeable object is used by queues to wake activities. class Wakeable { public: // Wake up the underlying activity. // After calling, this Wakeable cannot be used again. virtual void Wakeup() = 0; // Drop this wakeable without waking up the underlying activity. virtual void Drop() = 0; protected: inline ~Wakeable() {} }; namespace activity_detail { struct Unwakeable final : public Wakeable { void Wakeup() override {} void Drop() override {} }; static Unwakeable* unwakeable() { return NoDestructSingleton::Get(); } } // namespace activity_detail class AtomicWaker; // An owning reference to a Wakeable. // This type is non-copyable but movable. class Waker { public: explicit Waker(Wakeable* wakeable) : wakeable_(wakeable) {} Waker() : Waker(activity_detail::unwakeable()) {} ~Waker() { wakeable_->Drop(); } Waker(const Waker&) = delete; Waker& operator=(const Waker&) = delete; Waker(Waker&& other) noexcept : wakeable_(other.Take()) {} Waker& operator=(Waker&& other) noexcept { std::swap(wakeable_, other.wakeable_); return *this; } // Wake the underlying activity. void Wakeup() { Take()->Wakeup(); } template friend H AbslHashValue(H h, const Waker& w) { return H::combine(std::move(h), w.wakeable_); } bool operator==(const Waker& other) const noexcept { return wakeable_ == other.wakeable_; } private: friend class AtomicWaker; Wakeable* Take() { return std::exchange(wakeable_, activity_detail::unwakeable()); } Wakeable* wakeable_; }; // An atomic variant of Waker - this type is non-copyable and non-movable. class AtomicWaker { public: explicit AtomicWaker(Wakeable* wakeable) : wakeable_(wakeable) {} AtomicWaker() : AtomicWaker(activity_detail::unwakeable()) {} explicit AtomicWaker(Waker waker) : AtomicWaker(waker.Take()) {} ~AtomicWaker() { wakeable_.load(std::memory_order_acquire)->Drop(); } AtomicWaker(const AtomicWaker&) = delete; AtomicWaker& operator=(const AtomicWaker&) = delete; AtomicWaker(AtomicWaker&& other) noexcept = delete; AtomicWaker& operator=(AtomicWaker&& other) noexcept = delete; // Wake the underlying activity. void Wakeup() { Take()->Wakeup(); } // Return true if there is a not-unwakeable wakeable present. bool Armed() const noexcept { return wakeable_.load(std::memory_order_relaxed) != activity_detail::unwakeable(); } // Set to some new waker void Set(Waker waker) { wakeable_.exchange(waker.Take(), std::memory_order_acq_rel)->Wakeup(); } private: Wakeable* Take() { return wakeable_.exchange(activity_detail::unwakeable(), std::memory_order_acq_rel); } std::atomic wakeable_; }; // An Activity tracks execution of a single promise. // It executes the promise under a mutex. // When the promise stalls, it registers the containing activity to be woken up // later. // The activity takes a callback, which will be called exactly once with the // result of execution. // Activity execution may be cancelled by simply deleting the activity. In such // a case, if execution had not already finished, the done callback would be // called with absl::CancelledError(). class Activity : public Orphanable { public: // Force wakeup from the outside. // This should be rarely needed, and usages should be accompanied with a note // on why it's not possible to wakeup with a Waker object. // Nevertheless, it's sometimes useful for integrations with Activity to force // an Activity to repoll. void ForceWakeup() { MakeOwningWaker().Wakeup(); } // Force the current activity to immediately repoll if it doesn't complete. virtual void ForceImmediateRepoll() = 0; // Return the current activity. // Additionally: // - assert that there is a current activity (and catch bugs if there's not) // - indicate to thread safety analysis that the current activity is indeed // locked // - back up that assertation with a runtime check in debug builds (it's // prohibitively expensive in non-debug builds) static Activity* current() { return g_current_activity_; } // Produce an activity-owning Waker. The produced waker will keep the activity // alive until it's awoken or dropped. virtual Waker MakeOwningWaker() = 0; // Produce a non-owning Waker. The waker will own a small heap allocated weak // pointer to this activity. This is more suitable for wakeups that may not be // delivered until long after the activity should be destroyed. virtual Waker MakeNonOwningWaker() = 0; protected: // Check if this activity is the current activity executing on the current // thread. bool is_current() const { return this == g_current_activity_; } // Check if there is an activity executing on the current thread. static bool have_current() { return g_current_activity_ != nullptr; } // Set the current activity at construction, clean it up at destruction. class ScopedActivity { public: explicit ScopedActivity(Activity* activity) : prior_activity_(g_current_activity_) { g_current_activity_ = activity; } ~ScopedActivity() { g_current_activity_ = prior_activity_; } ScopedActivity(const ScopedActivity&) = delete; ScopedActivity& operator=(const ScopedActivity&) = delete; private: Activity* const prior_activity_; }; private: // Set during RunLoop to the Activity that's executing. // Being set implies that mu_ is held. static GPR_THREAD_LOCAL(Activity*) g_current_activity_; }; // Owned pointer to one Activity. using ActivityPtr = OrphanablePtr; namespace promise_detail { template class ContextHolder { public: using ContextType = Context; explicit ContextHolder(Context value) : value_(std::move(value)) {} Context* GetContext() { return &value_; } private: Context value_; }; template class ContextHolder { public: using ContextType = Context; explicit ContextHolder(Context* value) : value_(value) {} Context* GetContext() { return value_; } private: Context* value_; }; template class ContextHolder> { public: using ContextType = Context; explicit ContextHolder(std::unique_ptr value) : value_(std::move(value)) {} Context* GetContext() { return value_.get(); } private: std::unique_ptr value_; }; template using ContextTypeFromHeld = typename ContextHolder::ContextType; template class ActivityContexts : public ContextHolder... { public: explicit ActivityContexts(Contexts&&... contexts) : ContextHolder(std::forward(contexts))... {} class ScopedContext : public Context>... { public: explicit ScopedContext(ActivityContexts* contexts) : Context>( static_cast*>(contexts) ->GetContext())... {} }; }; // A free standing activity: an activity that owns its own synchronization and // memory. // The alternative is an activity that's somehow tied into another system, for // instance the type seen in promise_based_filter.h as we're transitioning from // the old filter stack to the new system. // FreestandingActivity is-a Wakeable, but needs to increment a refcount before // returning that Wakeable interface. Additionally, we want to keep // FreestandingActivity as small as is possible, since it will be used // everywhere. So we use inheritance to provide the Wakeable interface: this // makes it zero sized, and we make the inheritance private to prevent // accidental casting. class FreestandingActivity : public Activity, private Wakeable { public: Waker MakeOwningWaker() final { Ref(); return Waker(this); } Waker MakeNonOwningWaker() final; void Orphan() final { Cancel(); Unref(); } void ForceImmediateRepoll() final { mu_.AssertHeld(); SetActionDuringRun(ActionDuringRun::kWakeup); } protected: // Action received during a run, in priority order. // If more than one action is received during a run, we use max() to resolve // which one to report (so Cancel overrides Wakeup). enum class ActionDuringRun : uint8_t { kNone, // No action occured during run. kWakeup, // A wakeup occured during run. kCancel, // Cancel was called during run. }; inline ~FreestandingActivity() override { if (handle_) { DropHandle(); } } // Check if we got an internal wakeup since the last time this function was // called. ActionDuringRun GotActionDuringRun() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return std::exchange(action_during_run_, ActionDuringRun::kNone); } // Implementors of Wakeable::Wakeup should call this after the wakeup has // completed. void WakeupComplete() { Unref(); } // Set the action that occured during this run. // We use max to combine actions so that cancellation overrides wakeups. void SetActionDuringRun(ActionDuringRun action) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { action_during_run_ = std::max(action_during_run_, action); } Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; } private: class Handle; // Cancel execution of the underlying promise. virtual void Cancel() = 0; void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); } void Unref() { if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) { delete this; } } // Return a Handle instance with a ref so that it can be stored waiting for // some wakeup. Handle* RefHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // If our refcount is non-zero, ref and return true. // Otherwise, return false. bool RefIfNonzero(); // Drop the (proved existing) wait handle. void DropHandle() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); // All promise execution occurs under this mutex. Mutex mu_; // Current refcount. std::atomic refs_{1}; // If wakeup is called during Promise polling, we set this to Wakeup and // repoll. If cancel is called during Promise polling, we set this to Cancel // and cancel at the end of polling. ActionDuringRun action_during_run_ ABSL_GUARDED_BY(mu_) = ActionDuringRun::kNone; // Handle for long waits. Allows a very small weak pointer type object to // queue for wakeups while Activity may be deleted earlier. Handle* handle_ ABSL_GUARDED_BY(mu_) = nullptr; }; // Implementation details for an Activity of an arbitrary type of promise. // There should exist a static function: // struct WakeupScheduler { // template // void ScheduleWakeup(ActivityType* activity); // }; // This function should arrange that activity->RunScheduledWakeup() be invoked // at the earliest opportunity. // It can assume that activity will remain live until RunScheduledWakeup() is // invoked, and that a given activity will not be concurrently scheduled again // until its RunScheduledWakeup() has been invoked. // We use private inheritance here as a way of getting private members for // each of the contexts. // TODO(ctiller): We can probably reconsider the private inheritance here // when we move away from C++11 and have more powerful template features. template class PromiseActivity final : public FreestandingActivity, private ActivityContexts { public: using Factory = PromiseFactory; using ResultType = typename Factory::Promise::Result; PromiseActivity(F promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts&&... contexts) : FreestandingActivity(), ActivityContexts(std::forward(contexts)...), wakeup_scheduler_(std::move(wakeup_scheduler)), on_done_(std::move(on_done)) { // Lock, construct an initial promise from the factory, and step it. // This may hit a waiter, which could expose our this pointer to other // threads, meaning we do need to hold this mutex even though we're still // constructing. mu()->Lock(); auto status = Start(Factory(std::move(promise_factory))); mu()->Unlock(); // We may complete immediately. if (status.has_value()) { on_done_(std::move(*status)); } } ~PromiseActivity() override { // We shouldn't destruct without calling Cancel() first, and that must get // us to be done_, so we assume that and have no logic to destruct the // promise here. GPR_ASSERT(done_); } void RunScheduledWakeup() { GPR_ASSERT(wakeup_scheduled_.exchange(false, std::memory_order_acq_rel)); Step(); WakeupComplete(); } private: using typename ActivityContexts::ScopedContext; void Cancel() final { if (Activity::is_current()) { mu()->AssertHeld(); SetActionDuringRun(ActionDuringRun::kCancel); return; } bool was_done; { MutexLock lock(mu()); // Check if we were done, and flag done. was_done = done_; if (!done_) MarkDone(); } // If we were not done, then call the on_done callback. if (!was_done) { on_done_(absl::CancelledError()); } } // Wakeup this activity. Arrange to poll the activity again at a convenient // time: this could be inline if it's deemed safe, or it could be by passing // the activity to an external threadpool to run. If the activity is already // running on this thread, a note is taken of such and the activity is // repolled if it doesn't complete. void Wakeup() final { // If there is an active activity, but hey it's us, flag that and we'll loop // in RunLoop (that's calling from above here!). if (Activity::is_current()) { mu()->AssertHeld(); SetActionDuringRun(ActionDuringRun::kWakeup); WakeupComplete(); return; } if (!wakeup_scheduled_.exchange(true, std::memory_order_acq_rel)) { // Can't safely run, so ask to run later. wakeup_scheduler_.ScheduleWakeup(this); } else { // Already a wakeup scheduled for later, drop ref. WakeupComplete(); } } // Drop a wakeup void Drop() final { this->WakeupComplete(); } // Notification that we're no longer executing - it's ok to destruct the // promise. void MarkDone() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { GPR_ASSERT(!done_); done_ = true; Destruct(&promise_holder_.promise); } // In response to Wakeup, run the Promise state machine again until it // settles. Then check for completion, and if we have completed, call on_done. void Step() ABSL_LOCKS_EXCLUDED(mu()) { // Poll the promise until things settle out under a lock. mu()->Lock(); if (done_) { // We might get some spurious wakeups after finishing. mu()->Unlock(); return; } auto status = RunStep(); mu()->Unlock(); if (status.has_value()) { on_done_(std::move(*status)); } } // The main body of a step: set the current activity, and any contexts, and // then run the main polling loop. Contained in a function by itself in // order to keep the scoping rules a little easier in Step(). absl::optional RunStep() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { ScopedActivity scoped_activity(this); ScopedContext contexts(this); return StepLoop(); } // Similarly to RunStep, but additionally construct the promise from a // promise factory before entering the main loop. Called once from the // constructor. absl::optional Start(Factory promise_factory) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { ScopedActivity scoped_activity(this); ScopedContext contexts(this); Construct(&promise_holder_.promise, promise_factory.Once()); return StepLoop(); } // Until there are no wakeups from within and the promise is incomplete: // poll the promise. absl::optional StepLoop() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu()) { GPR_ASSERT(is_current()); while (true) { // Run the promise. GPR_ASSERT(!done_); auto r = promise_holder_.promise(); if (auto* status = absl::get_if(&r)) { // If complete, destroy the promise, flag done, and exit this loop. MarkDone(); return IntoStatus(status); } // Continue looping til no wakeups occur. switch (GotActionDuringRun()) { case ActionDuringRun::kNone: return {}; case ActionDuringRun::kWakeup: break; case ActionDuringRun::kCancel: MarkDone(); return absl::CancelledError(); } } } using Promise = typename Factory::Promise; // Scheduler for wakeups GPR_NO_UNIQUE_ADDRESS WakeupScheduler wakeup_scheduler_; // Callback on completion of the promise. GPR_NO_UNIQUE_ADDRESS OnDone on_done_; // Has execution completed? GPR_NO_UNIQUE_ADDRESS bool done_ ABSL_GUARDED_BY(mu()) = false; // Is there a wakeup scheduled? GPR_NO_UNIQUE_ADDRESS std::atomic wakeup_scheduled_{false}; // We wrap the promise in a union to allow control over the construction // simultaneously with annotating mutex requirements and noting that the // promise contained may not use any memory. union PromiseHolder { PromiseHolder() {} ~PromiseHolder() {} GPR_NO_UNIQUE_ADDRESS Promise promise; }; GPR_NO_UNIQUE_ADDRESS PromiseHolder promise_holder_ ABSL_GUARDED_BY(mu()); }; } // namespace promise_detail // Given a functor that returns a promise (a promise factory), a callback for // completion, and a callback scheduler, construct an activity. template ActivityPtr MakeActivity(Factory promise_factory, WakeupScheduler wakeup_scheduler, OnDone on_done, Contexts&&... contexts) { return ActivityPtr( new promise_detail::PromiseActivity( std::move(promise_factory), std::move(wakeup_scheduler), std::move(on_done), std::forward(contexts)...)); } } // namespace grpc_core #endif // GRPC_CORE_LIB_PROMISE_ACTIVITY_H