// Copyright 2022 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #include "src/core/lib/event_engine/posix_engine/ev_poll_posix.h" #include #include #include #include #include #include #include "absl/container/inlined_vector.h" #include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/strings/str_format.h" #include #include #include #include #include #include "src/core/lib/event_engine/poller.h" #include "src/core/lib/event_engine/posix_engine/event_poller.h" #include "src/core/lib/event_engine/posix_engine/posix_engine_closure.h" #include "src/core/lib/gprpp/crash.h" #include "src/core/lib/iomgr/port.h" #ifdef GRPC_POSIX_SOCKET_EV_POLL #include #include #include #include #include #include #include "src/core/lib/event_engine/common_closures.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix.h" #include "src/core/lib/event_engine/posix_engine/wakeup_fd_posix_default.h" #include "src/core/lib/event_engine/time_util.h" #include "src/core/lib/gprpp/fork.h" #include "src/core/lib/gprpp/status_helper.h" #include "src/core/lib/gprpp/strerror.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/gprpp/time.h" static const intptr_t kClosureNotReady = 0; static const intptr_t kClosureReady = 1; static const int kPollinCheck = POLLIN | POLLHUP | POLLERR; static const int kPolloutCheck = POLLOUT | POLLHUP | POLLERR; namespace grpc_event_engine { namespace experimental { using Events = absl::InlinedVector; class PollEventHandle : public EventHandle { public: PollEventHandle(int fd, PollPoller* poller) : fd_(fd), pending_actions_(0), fork_fd_list_(this), poller_handles_list_(this), poller_(poller), scheduler_(poller->GetScheduler()), is_orphaned_(false), is_shutdown_(false), closed_(false), released_(false), pollhup_(false), watch_mask_(-1), shutdown_error_(absl::OkStatus()), exec_actions_closure_([this]() { ExecutePendingActions(); }), on_done_(nullptr), read_closure_(reinterpret_cast(kClosureNotReady)), write_closure_( reinterpret_cast(kClosureNotReady)) { poller_->Ref(); grpc_core::MutexLock lock(&poller_->mu_); poller_->PollerHandlesListAddHandle(this); } PollPoller* Poller() override { return poller_; } bool SetPendingActions(bool pending_read, bool pending_write) { pending_actions_ |= pending_read; if (pending_write) { pending_actions_ |= (1 << 2); } if (pending_read || pending_write) { // The closure is going to be executed. We'll Unref this handle in // ExecutePendingActions. Ref(); return true; } return false; } void ForceRemoveHandleFromPoller() { grpc_core::MutexLock lock(&poller_->mu_); poller_->PollerHandlesListRemoveHandle(this); } int WrappedFd() override { return fd_; } bool IsOrphaned() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return is_orphaned_; } void CloseFd() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { if (!released_ && !closed_) { closed_ = true; close(fd_); } } bool IsPollhup() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return pollhup_; } void SetPollhup(bool pollhup) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { pollhup_ = pollhup; } bool IsWatched(int& watch_mask) const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { watch_mask = watch_mask_; return watch_mask_ != -1; } bool IsWatched() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { return watch_mask_ != -1; } void SetWatched(int watch_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { watch_mask_ = watch_mask; } void OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view reason) override; void ShutdownHandle(absl::Status why) override; void NotifyOnRead(PosixEngineClosure* on_read) override; void NotifyOnWrite(PosixEngineClosure* on_write) override; void NotifyOnError(PosixEngineClosure* on_error) override; void SetReadable() override; void SetWritable() override; void SetHasError() override; bool IsHandleShutdown() override { grpc_core::MutexLock lock(&mu_); return is_shutdown_; }; inline void ExecutePendingActions() { int kick = 0; { grpc_core::MutexLock lock(&mu_); if ((pending_actions_ & 1UL)) { if (SetReadyLocked(&read_closure_)) { kick = 1; } } if (((pending_actions_ >> 2) & 1UL)) { if (SetReadyLocked(&write_closure_)) { kick = 1; } } pending_actions_ = 0; } if (kick) { // SetReadyLocked immediately scheduled some closure. It would have set // the closure state to NOT_READY. We need to wakeup the Work(...) // thread to start polling on this fd. If this call is not made, it is // possible that the poller will reach a state where all the fds under // the poller's control are not polled for POLLIN/POLLOUT events thus // leading to an indefinitely blocked Work(..) method. poller_->KickExternal(false); } Unref(); } void Ref() { ref_count_.fetch_add(1, std::memory_order_relaxed); } void Unref() { if (ref_count_.fetch_sub(1, std::memory_order_acq_rel) == 1) { if (on_done_ != nullptr) { scheduler_->Run(on_done_); } poller_->Unref(); delete this; } } ~PollEventHandle() override = default; grpc_core::Mutex* mu() ABSL_LOCK_RETURNED(mu_) { return &mu_; } PollPoller::HandlesList& ForkFdListPos() { return fork_fd_list_; } PollPoller::HandlesList& PollerHandlesListPos() { return poller_handles_list_; } uint32_t BeginPollLocked(uint32_t read_mask, uint32_t write_mask) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); bool EndPollLocked(bool got_read, bool got_write) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); private: int SetReadyLocked(PosixEngineClosure** st); int NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure); // See Epoll1Poller::ShutdownHandle for explanation on why a mutex is // required. grpc_core::Mutex mu_; std::atomic ref_count_{1}; int fd_; int pending_actions_; PollPoller::HandlesList fork_fd_list_; PollPoller::HandlesList poller_handles_list_; PollPoller* poller_; Scheduler* scheduler_; bool is_orphaned_; bool is_shutdown_; bool closed_; bool released_; bool pollhup_; int watch_mask_; absl::Status shutdown_error_; AnyInvocableClosure exec_actions_closure_; PosixEngineClosure* on_done_; PosixEngineClosure* read_closure_; PosixEngineClosure* write_closure_; }; namespace { // Only used when GRPC_ENABLE_FORK_SUPPORT=1 std::list fork_poller_list; // Only used when GRPC_ENABLE_FORK_SUPPORT=1 PollEventHandle* fork_fd_list_head = nullptr; gpr_mu fork_fd_list_mu; void ForkFdListAddHandle(PollEventHandle* handle) { if (grpc_core::Fork::Enabled()) { gpr_mu_lock(&fork_fd_list_mu); handle->ForkFdListPos().next = fork_fd_list_head; handle->ForkFdListPos().prev = nullptr; if (fork_fd_list_head != nullptr) { fork_fd_list_head->ForkFdListPos().prev = handle; } fork_fd_list_head = handle; gpr_mu_unlock(&fork_fd_list_mu); } } void ForkFdListRemoveHandle(PollEventHandle* handle) { if (grpc_core::Fork::Enabled()) { gpr_mu_lock(&fork_fd_list_mu); if (fork_fd_list_head == handle) { fork_fd_list_head = handle->ForkFdListPos().next; } if (handle->ForkFdListPos().prev != nullptr) { handle->ForkFdListPos().prev->ForkFdListPos().next = handle->ForkFdListPos().next; } if (handle->ForkFdListPos().next != nullptr) { handle->ForkFdListPos().next->ForkFdListPos().prev = handle->ForkFdListPos().prev; } gpr_mu_unlock(&fork_fd_list_mu); } } void ForkPollerListAddPoller(PollPoller* poller) { if (grpc_core::Fork::Enabled()) { gpr_mu_lock(&fork_fd_list_mu); fork_poller_list.push_back(poller); gpr_mu_unlock(&fork_fd_list_mu); } } void ForkPollerListRemovePoller(PollPoller* poller) { if (grpc_core::Fork::Enabled()) { gpr_mu_lock(&fork_fd_list_mu); fork_poller_list.remove(poller); gpr_mu_unlock(&fork_fd_list_mu); } } // Returns the number of milliseconds elapsed between now and start timestamp. int PollElapsedTimeToMillis(grpc_core::Timestamp start) { if (start == grpc_core::Timestamp::InfFuture()) return -1; grpc_core::Timestamp now = grpc_core::Timestamp::FromTimespecRoundDown(gpr_now(GPR_CLOCK_MONOTONIC)); int64_t delta = (now - start).millis(); if (delta > INT_MAX) { return INT_MAX; } else if (delta < 0) { return 0; } else { return static_cast(delta); } } bool InitPollPollerPosix(); // Called by the child process's post-fork handler to close open fds, // including the global epoll fd of each poller. This allows gRPC to shutdown // in the child process without interfering with connections or RPCs ongoing // in the parent. void ResetEventManagerOnFork() { // Delete all pending Epoll1EventHandles. gpr_mu_lock(&fork_fd_list_mu); while (fork_fd_list_head != nullptr) { close(fork_fd_list_head->WrappedFd()); PollEventHandle* next = fork_fd_list_head->ForkFdListPos().next; fork_fd_list_head->ForceRemoveHandleFromPoller(); delete fork_fd_list_head; fork_fd_list_head = next; } // Delete all registered pollers. while (!fork_poller_list.empty()) { PollPoller* poller = fork_poller_list.front(); fork_poller_list.pop_front(); delete poller; } gpr_mu_unlock(&fork_fd_list_mu); if (grpc_core::Fork::Enabled()) { gpr_mu_destroy(&fork_fd_list_mu); grpc_core::Fork::SetResetChildPollingEngineFunc(nullptr); } InitPollPollerPosix(); } // It is possible that GLIBC has epoll but the underlying kernel doesn't. // Create epoll_fd to make sure epoll support is available bool InitPollPollerPosix() { if (!grpc_event_engine::experimental::SupportsWakeupFd()) { return false; } if (grpc_core::Fork::Enabled()) { gpr_mu_init(&fork_fd_list_mu); grpc_core::Fork::SetResetChildPollingEngineFunc(ResetEventManagerOnFork); } return true; } } // namespace EventHandle* PollPoller::CreateHandle(int fd, absl::string_view /*name*/, bool track_err) { // Avoid unused-parameter warning for debug-only parameter (void)track_err; GPR_DEBUG_ASSERT(track_err == false); PollEventHandle* handle = new PollEventHandle(fd, this); ForkFdListAddHandle(handle); // We need to send a kick to the thread executing Work(..) so that it can // add this new Fd into the list of Fds to poll. KickExternal(false); return handle; } void PollEventHandle::OrphanHandle(PosixEngineClosure* on_done, int* release_fd, absl::string_view /*reason*/) { ForkFdListRemoveHandle(this); ForceRemoveHandleFromPoller(); { grpc_core::ReleasableMutexLock lock(&mu_); on_done_ = on_done; released_ = release_fd != nullptr; if (release_fd != nullptr) { *release_fd = fd_; } GPR_ASSERT(!is_orphaned_); is_orphaned_ = true; // Perform shutdown operations if not already done so. if (!is_shutdown_) { is_shutdown_ = true; shutdown_error_ = absl::Status(absl::StatusCode::kInternal, "FD Orphaned"); grpc_core::StatusSetInt(&shutdown_error_, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); SetReadyLocked(&read_closure_); SetReadyLocked(&write_closure_); } // signal read/write closed to OS so that future operations fail. if (!released_) { shutdown(fd_, SHUT_RDWR); } if (!IsWatched()) { CloseFd(); } else { // It is watched i.e we cannot take action wihout breaking from the // blocking poll. Mark it as Unwatched and kick the thread executing // Work(...). That thread should proceed with the cleanup. SetWatched(-1); lock.Release(); poller_->KickExternal(false); } } Unref(); } int PollEventHandle::NotifyOnLocked(PosixEngineClosure** st, PosixEngineClosure* closure) { if (is_shutdown_ || pollhup_) { closure->SetStatus(shutdown_error_); scheduler_->Run(closure); } else if (*st == reinterpret_cast(kClosureNotReady)) { // not ready ==> switch to a waiting state by setting the closure *st = closure; return 0; } else if (*st == reinterpret_cast(kClosureReady)) { // already ready ==> queue the closure to run immediately *st = reinterpret_cast(kClosureNotReady); closure->SetStatus(shutdown_error_); scheduler_->Run(closure); return 1; } else { // upcallptr was set to a different closure. This is an error! grpc_core::Crash( "User called a notify_on function with a previous callback still " "pending"); } return 0; } // returns 1 if state becomes not ready int PollEventHandle::SetReadyLocked(PosixEngineClosure** st) { if (*st == reinterpret_cast(kClosureReady)) { // duplicate ready ==> ignore return 0; } else if (*st == reinterpret_cast(kClosureNotReady)) { // not ready, and not waiting ==> flag ready *st = reinterpret_cast(kClosureReady); return 0; } else { // waiting ==> queue closure PosixEngineClosure* closure = *st; *st = reinterpret_cast(kClosureNotReady); closure->SetStatus(shutdown_error_); scheduler_->Run(closure); return 1; } } void PollEventHandle::ShutdownHandle(absl::Status why) { // We need to take a Ref here because SetReadyLocked may trigger execution // of a closure which calls OrphanHandle or poller->Shutdown() prematurely. Ref(); { grpc_core::MutexLock lock(&mu_); // only shutdown once if (!is_shutdown_) { is_shutdown_ = true; shutdown_error_ = why; grpc_core::StatusSetInt(&shutdown_error_, grpc_core::StatusIntProperty::kRpcStatus, GRPC_STATUS_UNAVAILABLE); SetReadyLocked(&read_closure_); SetReadyLocked(&write_closure_); } } // For the Ref() taken at the begining of this function. Unref(); } void PollEventHandle::NotifyOnRead(PosixEngineClosure* on_read) { // We need to take a Ref here because NotifyOnLocked may trigger execution // of a closure which calls OrphanHandle that may delete this object or call // poller->Shutdown() prematurely. Ref(); { grpc_core::ReleasableMutexLock lock(&mu_); if (NotifyOnLocked(&read_closure_, on_read)) { lock.Release(); // NotifyOnLocked immediately scheduled some closure. It would have set // the closure state to NOT_READY. We need to wakeup the Work(...) thread // to start polling on this fd. If this call is not made, it is possible // that the poller will reach a state where all the fds under the // poller's control are not polled for POLLIN/POLLOUT events thus leading // to an indefinitely blocked Work(..) method. poller_->KickExternal(false); } } // For the Ref() taken at the begining of this function. Unref(); } void PollEventHandle::NotifyOnWrite(PosixEngineClosure* on_write) { // We need to take a Ref here because NotifyOnLocked may trigger execution // of a closure which calls OrphanHandle that may delete this object or call // poller->Shutdown() prematurely. Ref(); { grpc_core::ReleasableMutexLock lock(&mu_); if (NotifyOnLocked(&write_closure_, on_write)) { lock.Release(); // NotifyOnLocked immediately scheduled some closure. It would have set // the closure state to NOT_READY. We need to wakeup the Work(...) thread // to start polling on this fd. If this call is not made, it is possible // that the poller will reach a state where all the fds under the // poller's control are not polled for POLLIN/POLLOUT events thus leading // to an indefinitely blocked Work(..) method. poller_->KickExternal(false); } } // For the Ref() taken at the begining of this function. Unref(); } void PollEventHandle::NotifyOnError(PosixEngineClosure* on_error) { on_error->SetStatus( absl::Status(absl::StatusCode::kCancelled, "Polling engine does not support tracking errors")); scheduler_->Run(on_error); } void PollEventHandle::SetReadable() { Ref(); { grpc_core::MutexLock lock(&mu_); SetReadyLocked(&read_closure_); } Unref(); } void PollEventHandle::SetWritable() { Ref(); { grpc_core::MutexLock lock(&mu_); SetReadyLocked(&write_closure_); } Unref(); } void PollEventHandle::SetHasError() {} uint32_t PollEventHandle::BeginPollLocked(uint32_t read_mask, uint32_t write_mask) { uint32_t mask = 0; bool read_ready = (pending_actions_ & 1UL); bool write_ready = ((pending_actions_ >> 2) & 1UL); Ref(); // If we are shutdown, then no need to poll this fd. Set watch_mask to 0. if (is_shutdown_) { SetWatched(0); return 0; } // If there is nobody polling for read, but we need to, then start doing so. if (read_mask && !read_ready && read_closure_ != reinterpret_cast(kClosureReady)) { mask |= read_mask; } // If there is nobody polling for write, but we need to, then start doing so if (write_mask && !write_ready && write_closure_ != reinterpret_cast(kClosureReady)) { mask |= write_mask; } SetWatched(mask); return mask; } bool PollEventHandle::EndPollLocked(bool got_read, bool got_write) { if (is_orphaned_ && !IsWatched()) { CloseFd(); } else if (!is_orphaned_) { return SetPendingActions(got_read, got_write); } return false; } void PollPoller::KickExternal(bool ext) { grpc_core::MutexLock lock(&mu_); if (was_kicked_) { if (ext) { was_kicked_ext_ = true; } return; } was_kicked_ = true; was_kicked_ext_ = ext; GPR_ASSERT(wakeup_fd_->Wakeup().ok()); } void PollPoller::Kick() { KickExternal(true); } void PollPoller::PollerHandlesListAddHandle(PollEventHandle* handle) { handle->PollerHandlesListPos().next = poll_handles_list_head_; handle->PollerHandlesListPos().prev = nullptr; if (poll_handles_list_head_ != nullptr) { poll_handles_list_head_->PollerHandlesListPos().prev = handle; } poll_handles_list_head_ = handle; ++num_poll_handles_; } void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* handle) { if (poll_handles_list_head_ == handle) { poll_handles_list_head_ = handle->PollerHandlesListPos().next; } if (handle->PollerHandlesListPos().prev != nullptr) { handle->PollerHandlesListPos().prev->PollerHandlesListPos().next = handle->PollerHandlesListPos().next; } if (handle->PollerHandlesListPos().next != nullptr) { handle->PollerHandlesListPos().next->PollerHandlesListPos().prev = handle->PollerHandlesListPos().prev; } --num_poll_handles_; } PollPoller::PollPoller(Scheduler* scheduler) : scheduler_(scheduler), use_phony_poll_(false), was_kicked_(false), was_kicked_ext_(false), num_poll_handles_(0), poll_handles_list_head_(nullptr) { wakeup_fd_ = *CreateWakeupFd(); GPR_ASSERT(wakeup_fd_ != nullptr); ForkPollerListAddPoller(this); } PollPoller::PollPoller(Scheduler* scheduler, bool use_phony_poll) : scheduler_(scheduler), use_phony_poll_(use_phony_poll), was_kicked_(false), was_kicked_ext_(false), num_poll_handles_(0), poll_handles_list_head_(nullptr) { wakeup_fd_ = *CreateWakeupFd(); GPR_ASSERT(wakeup_fd_ != nullptr); ForkPollerListAddPoller(this); } PollPoller::~PollPoller() { // Assert that no active handles are present at the time of destruction. // They should have been orphaned before reaching this state. GPR_ASSERT(num_poll_handles_ == 0); GPR_ASSERT(poll_handles_list_head_ == nullptr); } Poller::WorkResult PollPoller::Work( EventEngine::Duration timeout, absl::FunctionRef schedule_poll_again) { // Avoid malloc for small number of elements. enum { inline_elements = 96 }; struct pollfd pollfd_space[inline_elements]; bool was_kicked_ext = false; PollEventHandle* watcher_space[inline_elements]; Events pending_events; pending_events.clear(); int timeout_ms = static_cast(grpc_event_engine::experimental::Milliseconds(timeout)); mu_.Lock(); // Start polling, and keep doing so while we're being asked to // re-evaluate our pollers (this allows poll() based pollers to // ensure they don't miss wakeups). while (pending_events.empty() && timeout_ms >= 0) { int r = 0; size_t i; nfds_t pfd_count; struct pollfd* pfds; PollEventHandle** watchers; // Estimate start time for a poll iteration. grpc_core::Timestamp start = grpc_core::Timestamp::FromTimespecRoundDown( gpr_now(GPR_CLOCK_MONOTONIC)); if (num_poll_handles_ + 2 <= inline_elements) { pfds = pollfd_space; watchers = watcher_space; } else { const size_t pfd_size = sizeof(*pfds) * (num_poll_handles_ + 2); const size_t watch_size = sizeof(*watchers) * (num_poll_handles_ + 2); void* buf = gpr_malloc(pfd_size + watch_size); pfds = static_cast(buf); watchers = static_cast( static_cast((static_cast(buf) + pfd_size))); pfds = static_cast(buf); } pfd_count = 1; pfds[0].fd = wakeup_fd_->ReadFd(); pfds[0].events = POLLIN; pfds[0].revents = 0; PollEventHandle* head = poll_handles_list_head_; while (head != nullptr) { { grpc_core::MutexLock lock(head->mu()); // There shouldn't be any orphaned fds at this point. This is because // prior to marking a handle as orphaned it is first removed from // poll handle list for the poller under the poller lock. GPR_ASSERT(!head->IsOrphaned()); if (!head->IsPollhup()) { pfds[pfd_count].fd = head->WrappedFd(); watchers[pfd_count] = head; // BeginPollLocked takes a ref of the handle. It also marks the // fd as Watched with an appropriate watch_mask. The watch_mask // is 0 if the fd is shutdown or if the fd is already ready (i.e // both read and write events are already available) and doesn't // need to be polled again. The watch_mask is > 0 otherwise // indicating the fd needs to be polled. pfds[pfd_count].events = head->BeginPollLocked(POLLIN, POLLOUT); pfd_count++; } } head = head->PollerHandlesListPos().next; } mu_.Unlock(); if (!use_phony_poll_ || timeout_ms == 0 || pfd_count == 1) { // If use_phony_poll is true and pfd_count == 1, it implies only the // wakeup_fd is present. Allow the call to get blocked in this case as // well instead of crashing. This is because the poller::Work is called // right after an event enging is constructed. Even if phony poll is // expected to be used, we dont want to check for it until some actual // event handles are registered. Otherwise the EventEngine construction // may crash. r = poll(pfds, pfd_count, timeout_ms); } else { grpc_core::Crash("Attempted a blocking poll when declared non-polling."); } if (r <= 0) { if (r < 0 && errno != EINTR) { // Abort fail here. grpc_core::Crash(absl::StrFormat( "(event_engine) PollPoller:%p encountered poll error: %s", this, grpc_core::StrError(errno).c_str())); } for (i = 1; i < pfd_count; i++) { PollEventHandle* head = watchers[i]; int watch_mask; grpc_core::ReleasableMutexLock lock(head->mu()); if (head->IsWatched(watch_mask)) { head->SetWatched(-1); // This fd was Watched with a watch mask > 0. if (watch_mask > 0 && r < 0) { // This case implies the fd was polled (since watch_mask > 0 and // the poll returned an error. Mark the fds as both readable and // writable. if (head->EndPollLocked(true, true)) { // Its safe to add to list of pending events because // EndPollLocked returns true only when the handle is // not orphaned. But an orphan might be initiated on the handle // after this Work() method returns and before the next Work() // method is invoked. pending_events.push_back(head); } } else { // In this case, (1) watch_mask > 0 && r == 0 or (2) watch_mask == // 0 and r < 0 or (3) watch_mask == 0 and r == 0. For case-1, no // events are pending on the fd even though the fd was polled. For // case-2 and 3, the fd was not polled head->EndPollLocked(false, false); } } else { // It can enter this case if an orphan was invoked on the handle // while it was being polled. head->EndPollLocked(false, false); } lock.Release(); // Unref the ref taken at BeginPollLocked. head->Unref(); } } else { if (pfds[0].revents & kPollinCheck) { GPR_ASSERT(wakeup_fd_->ConsumeWakeup().ok()); } for (i = 1; i < pfd_count; i++) { PollEventHandle* head = watchers[i]; int watch_mask; grpc_core::ReleasableMutexLock lock(head->mu()); if (!head->IsWatched(watch_mask) || watch_mask == 0) { // IsWatched will be false if an orphan was invoked on the // handle while it was being polled. If watch_mask is 0, then the fd // was not polled. head->SetWatched(-1); head->EndPollLocked(false, false); } else { // Watched is true and watch_mask > 0 if (pfds[i].revents & POLLHUP) { head->SetPollhup(true); } head->SetWatched(-1); if (head->EndPollLocked(pfds[i].revents & kPollinCheck, pfds[i].revents & kPolloutCheck)) { // Its safe to add to list of pending events because EndPollLocked // returns true only when the handle is not orphaned. // But an orphan might be initiated on the handle after this // Work() method returns and before the next Work() method is // invoked. pending_events.push_back(head); } } lock.Release(); // Unref the ref taken at BeginPollLocked. head->Unref(); } } if (pfds != pollfd_space) { gpr_free(pfds); } // End of poll iteration. Update how much time is remaining. timeout_ms -= PollElapsedTimeToMillis(start); mu_.Lock(); if (std::exchange(was_kicked_, false) && std::exchange(was_kicked_ext_, false)) { // External kick. Need to break out. was_kicked_ext = true; break; } } mu_.Unlock(); if (pending_events.empty()) { if (was_kicked_ext) { return Poller::WorkResult::kKicked; } return Poller::WorkResult::kDeadlineExceeded; } // Run the provided callback synchronously. schedule_poll_again(); // Process all pending events inline. for (auto& it : pending_events) { it->ExecutePendingActions(); } return was_kicked_ext ? Poller::WorkResult::kKicked : Poller::WorkResult::kOk; } void PollPoller::Shutdown() { ForkPollerListRemovePoller(this); Unref(); } PollPoller* MakePollPoller(Scheduler* scheduler, bool use_phony_poll) { static bool kPollPollerSupported = InitPollPollerPosix(); if (kPollPollerSupported) { return new PollPoller(scheduler, use_phony_poll); } return nullptr; } } // namespace experimental } // namespace grpc_event_engine #else // GRPC_POSIX_SOCKET_EV_POLL #include "src/core/lib/gprpp/crash.h" namespace grpc_event_engine { namespace experimental { PollPoller::PollPoller(Scheduler* /* engine */) { grpc_core::Crash("unimplemented"); } void PollPoller::Shutdown() { grpc_core::Crash("unimplemented"); } PollPoller::~PollPoller() { grpc_core::Crash("unimplemented"); } EventHandle* PollPoller::CreateHandle(int /*fd*/, absl::string_view /*name*/, bool /*track_err*/) { grpc_core::Crash("unimplemented"); } Poller::WorkResult PollPoller::Work( EventEngine::Duration /*timeout*/, absl::FunctionRef /*schedule_poll_again*/) { grpc_core::Crash("unimplemented"); } void PollPoller::Kick() { grpc_core::Crash("unimplemented"); } // If GRPC_LINUX_EPOLL is not defined, it means epoll is not available. Return // nullptr. PollPoller* MakePollPoller(Scheduler* /*scheduler*/, bool /* use_phony_poll */) { return nullptr; } void PollPoller::KickExternal(bool /*ext*/) { grpc_core::Crash("unimplemented"); } void PollPoller::PollerHandlesListAddHandle(PollEventHandle* /*handle*/) { grpc_core::Crash("unimplemented"); } void PollPoller::PollerHandlesListRemoveHandle(PollEventHandle* /*handle*/) { grpc_core::Crash("unimplemented"); } } // namespace experimental } // namespace grpc_event_engine #endif // GRPC_POSIX_SOCKET_EV_POLL