/* * * Copyright 2020 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ /// Event engine based on Apple's CFRunLoop API family. If the CFRunLoop engine /// is enabled (see iomgr_posix_cfstream.cc), a global thread is started to /// handle and trigger all the CFStream events. The CFStream streams register /// themselves with the run loop with functions grpc_apple_register_read_stream /// and grpc_apple_register_read_stream. Pollsets are phony and block on a /// condition variable in pollset_work(). #include #include "src/core/lib/iomgr/port.h" #ifdef GRPC_APPLE_EV #include #include #include "absl/time/time.h" #include "src/core/lib/gprpp/thd.h" #include "src/core/lib/gprpp/time_util.h" #include "src/core/lib/iomgr/ev_apple.h" grpc_core::DebugOnlyTraceFlag grpc_apple_polling_trace(false, "apple_polling"); #ifndef NDEBUG #define GRPC_POLLING_TRACE(format, ...) \ if (GRPC_TRACE_FLAG_ENABLED(grpc_apple_polling_trace)) { \ gpr_log(GPR_DEBUG, "(polling) " format, __VA_ARGS__); \ } #else #define GRPC_POLLING_TRACE(...) #endif // NDEBUG #define GRPC_POLLSET_KICK_BROADCAST ((grpc_pollset_worker*)1) struct GlobalRunLoopContext { grpc_core::CondVar init_cv; grpc_core::CondVar input_source_cv; grpc_core::Mutex mu; // Whether an input source registration is pending. Protected by mu. bool input_source_registered = false; // The reference to the global run loop object. Protected by mu. CFRunLoopRef run_loop; // Whether the pollset has been globally shut down. Protected by mu. bool is_shutdown = false; }; struct GrpcAppleWorker { // The condition varible to kick the worker. Works with the pollset's lock // (GrpcApplePollset.mu). grpc_core::CondVar cv; // Whether the worker is kicked. Protected by the pollset's lock // (GrpcApplePollset.mu). bool kicked = false; }; struct GrpcApplePollset { grpc_core::Mutex mu; // Tracks the current workers in the pollset. Protected by mu. std::list workers; // Whether the pollset is shut down. Protected by mu. bool is_shutdown = false; // Closure to call when shutdown is done. Protected by mu. grpc_closure* shutdown_closure; // Whether there's an outstanding kick that was not processed. Protected by // mu. bool kicked_without_poller = false; }; static GlobalRunLoopContext* gGlobalRunLoopContext = nullptr; static grpc_core::Thread* gGlobalRunLoopThread = nullptr; /// Register the stream with the dispatch queue. Callbacks of the stream will be /// issued to the dispatch queue when a network event happens and will be /// managed by Grand Central Dispatch. static void grpc_apple_register_read_stream_queue( CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { CFReadStreamSetDispatchQueue(read_stream, dispatch_queue); } /// Register the stream with the dispatch queue. Callbacks of the stream will be /// issued to the dispatch queue when a network event happens and will be /// managed by Grand Central Dispatch. static void grpc_apple_register_write_stream_queue( CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { CFWriteStreamSetDispatchQueue(write_stream, dispatch_queue); } /// Register the stream with the global run loop. Callbacks of the stream will /// be issued to the run loop when a network event happens and will be driven by /// the global run loop thread gGlobalRunLoopThread. static void grpc_apple_register_read_stream_run_loop( CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { GRPC_POLLING_TRACE("Register read stream: %p", read_stream); grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); CFReadStreamScheduleWithRunLoop(read_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode); gGlobalRunLoopContext->input_source_registered = true; gGlobalRunLoopContext->input_source_cv.Signal(); } /// Register the stream with the global run loop. Callbacks of the stream will /// be issued to the run loop when a network event happens, and will be driven /// by the global run loop thread gGlobalRunLoopThread. static void grpc_apple_register_write_stream_run_loop( CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { GRPC_POLLING_TRACE("Register write stream: %p", write_stream); grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); CFWriteStreamScheduleWithRunLoop( write_stream, gGlobalRunLoopContext->run_loop, kCFRunLoopDefaultMode); gGlobalRunLoopContext->input_source_registered = true; gGlobalRunLoopContext->input_source_cv.Signal(); } /// The default implementation of stream registration is to register the stream /// to a dispatch queue. However, if the CFRunLoop based pollset is enabled (by /// macro and environment variable, see docs in iomgr_posix_cfstream.cc), the /// CFStream streams are registered with the global run loop instead (see /// pollset_global_init below). static void (*grpc_apple_register_read_stream_impl)( CFReadStreamRef, dispatch_queue_t) = grpc_apple_register_read_stream_queue; static void (*grpc_apple_register_write_stream_impl)(CFWriteStreamRef, dispatch_queue_t) = grpc_apple_register_write_stream_queue; void grpc_apple_register_read_stream(CFReadStreamRef read_stream, dispatch_queue_t dispatch_queue) { grpc_apple_register_read_stream_impl(read_stream, dispatch_queue); } void grpc_apple_register_write_stream(CFWriteStreamRef write_stream, dispatch_queue_t dispatch_queue) { grpc_apple_register_write_stream_impl(write_stream, dispatch_queue); } /// Drive the run loop in a global singleton thread until the global run loop is /// shutdown. static void GlobalRunLoopFunc(void* arg) { grpc_core::LockableAndReleasableMutexLock lock(&gGlobalRunLoopContext->mu); gGlobalRunLoopContext->run_loop = CFRunLoopGetCurrent(); gGlobalRunLoopContext->init_cv.Signal(); while (!gGlobalRunLoopContext->is_shutdown) { // CFRunLoopRun() will return immediately if no stream is registered on it. // So we wait on a conditional variable until a stream is registered; // otherwise we'll be running a spinning loop. while (!gGlobalRunLoopContext->input_source_registered) { gGlobalRunLoopContext->input_source_cv.Wait(&gGlobalRunLoopContext->mu); } gGlobalRunLoopContext->input_source_registered = false; lock.Release(); CFRunLoopRun(); lock.Lock(); } lock.Release(); } // pollset implementation static void pollset_global_init(void) { gGlobalRunLoopContext = new GlobalRunLoopContext; grpc_apple_register_read_stream_impl = grpc_apple_register_read_stream_run_loop; grpc_apple_register_write_stream_impl = grpc_apple_register_write_stream_run_loop; grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); gGlobalRunLoopThread = new grpc_core::Thread("apple_ev", GlobalRunLoopFunc, nullptr); gGlobalRunLoopThread->Start(); while (gGlobalRunLoopContext->run_loop == NULL) gGlobalRunLoopContext->init_cv.Wait(&gGlobalRunLoopContext->mu); } static void pollset_global_shutdown(void) { { grpc_core::MutexLock lock(&gGlobalRunLoopContext->mu); gGlobalRunLoopContext->is_shutdown = true; CFRunLoopStop(gGlobalRunLoopContext->run_loop); } gGlobalRunLoopThread->Join(); delete gGlobalRunLoopThread; delete gGlobalRunLoopContext; } /// The caller must acquire the lock GrpcApplePollset.mu before calling this /// function. The lock may be temporarily released when waiting on the condition /// variable but will be re-acquired before the function returns. /// /// The Apple pollset simply waits on a condition variable until it is kicked. /// The network events are handled in the global run loop thread. Processing of /// these events will eventually trigger the kick. static grpc_error* pollset_work(grpc_pollset* pollset, grpc_pollset_worker** worker, grpc_millis deadline) { GRPC_POLLING_TRACE("pollset work: %p, worker: %p, deadline: %" PRIu64, pollset, worker, deadline); GrpcApplePollset* apple_pollset = reinterpret_cast(pollset); GrpcAppleWorker actual_worker; if (worker) { *worker = reinterpret_cast(&actual_worker); } if (apple_pollset->kicked_without_poller) { // Process the outstanding kick and reset the flag. Do not block. apple_pollset->kicked_without_poller = false; } else { // Block until kicked, timed out, or the pollset shuts down. apple_pollset->workers.push_front(&actual_worker); auto it = apple_pollset->workers.begin(); while (!actual_worker.kicked && !apple_pollset->is_shutdown) { if (actual_worker.cv.WaitWithDeadline( &apple_pollset->mu, grpc_core::ToAbslTime(grpc_millis_to_timespec( deadline, GPR_CLOCK_REALTIME)))) { // timed out break; } } apple_pollset->workers.erase(it); // If the pollset is shut down asynchronously and this is the last pending // worker, the shutdown process is complete at this moment and the shutdown // callback will be called. if (apple_pollset->is_shutdown && apple_pollset->workers.empty()) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, apple_pollset->shutdown_closure, GRPC_ERROR_NONE); } } return GRPC_ERROR_NONE; } /// Kick a specific worker. The caller must acquire the lock GrpcApplePollset.mu /// before calling this function. static void kick_worker(GrpcAppleWorker* worker) { worker->kicked = true; worker->cv.Signal(); } /// The caller must acquire the lock GrpcApplePollset.mu before calling this /// function. The kick action simply signals the condition variable of the /// worker. static grpc_error* pollset_kick(grpc_pollset* pollset, grpc_pollset_worker* specific_worker) { GrpcApplePollset* apple_pollset = reinterpret_cast(pollset); GRPC_POLLING_TRACE("pollset kick: %p, worker:%p", pollset, specific_worker); if (specific_worker == nullptr) { if (apple_pollset->workers.empty()) { apple_pollset->kicked_without_poller = true; } else { GrpcAppleWorker* actual_worker = apple_pollset->workers.front(); kick_worker(actual_worker); } } else if (specific_worker == GRPC_POLLSET_KICK_BROADCAST) { for (auto& actual_worker : apple_pollset->workers) { kick_worker(actual_worker); } } else { GrpcAppleWorker* actual_worker = reinterpret_cast(specific_worker); kick_worker(actual_worker); } return GRPC_ERROR_NONE; } static void pollset_init(grpc_pollset* pollset, gpr_mu** mu) { GRPC_POLLING_TRACE("pollset init: %p", pollset); GrpcApplePollset* apple_pollset = new (pollset) GrpcApplePollset(); *mu = grpc_core::GetUnderlyingGprMu(&apple_pollset->mu); } /// The caller must acquire the lock GrpcApplePollset.mu before calling this /// function. static void pollset_shutdown(grpc_pollset* pollset, grpc_closure* closure) { GRPC_POLLING_TRACE("pollset shutdown: %p", pollset); GrpcApplePollset* apple_pollset = reinterpret_cast(pollset); apple_pollset->is_shutdown = true; pollset_kick(pollset, GRPC_POLLSET_KICK_BROADCAST); // If there is any worker blocked, shutdown will be done asynchronously. if (apple_pollset->workers.empty()) { grpc_core::ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE); } else { apple_pollset->shutdown_closure = closure; } } static void pollset_destroy(grpc_pollset* pollset) { GRPC_POLLING_TRACE("pollset destroy: %p", pollset); GrpcApplePollset* apple_pollset = reinterpret_cast(pollset); apple_pollset->~GrpcApplePollset(); } size_t pollset_size(void) { return sizeof(GrpcApplePollset); } grpc_pollset_vtable grpc_apple_pollset_vtable = { pollset_global_init, pollset_global_shutdown, pollset_init, pollset_shutdown, pollset_destroy, pollset_work, pollset_kick, pollset_size}; // pollset_set implementation grpc_pollset_set* pollset_set_create(void) { return nullptr; } void pollset_set_destroy(grpc_pollset_set* pollset_set) {} void pollset_set_add_pollset(grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} void pollset_set_del_pollset(grpc_pollset_set* pollset_set, grpc_pollset* pollset) {} void pollset_set_add_pollset_set(grpc_pollset_set* bag, grpc_pollset_set* item) {} void pollset_set_del_pollset_set(grpc_pollset_set* bag, grpc_pollset_set* item) {} grpc_pollset_set_vtable grpc_apple_pollset_set_vtable = { pollset_set_create, pollset_set_destroy, pollset_set_add_pollset, pollset_set_del_pollset, pollset_set_add_pollset_set, pollset_set_del_pollset_set}; #endif