// Copyright 2021 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef GRPC_EVENT_ENGINE_EVENT_ENGINE_H #define GRPC_EVENT_ENGINE_EVENT_ENGINE_H #include #include #include #include #include #include #include #include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/status/statusor.h" // TODO(vigneshbabu): Define the Endpoint::Write metrics collection system namespace grpc_event_engine { namespace experimental { //////////////////////////////////////////////////////////////////////////////// /// The EventEngine Interface /// /// Overview /// -------- /// /// The EventEngine encapsulates all platform-specific behaviors related to low /// level network I/O, timers, asynchronous execution, and DNS resolution. /// /// This interface allows developers to provide their own event management and /// network stacks. Motivating uses cases for supporting custom EventEngines /// include the ability to hook into external event loops, and using different /// EventEngine instances for each channel to better insulate network I/O and /// callback processing from other channels. /// /// A default cross-platform EventEngine instance is provided by gRPC. /// /// Lifespan and Ownership /// ---------------------- /// /// gRPC takes shared ownership of EventEngines via std::shared_ptrs to ensure /// that the engines remain available until they are no longer needed. Depending /// on the use case, engines may live until gRPC is shut down. /// /// EXAMPLE USAGE (Not yet implemented) /// /// Custom EventEngines can be specified per channel, and allow configuration /// for both clients and servers. To set a custom EventEngine for a client /// channel, you can do something like the following: /// /// ChannelArguments args; /// std::shared_ptr engine = std::make_shared(...); /// args.SetEventEngine(engine); /// MyAppClient client(grpc::CreateCustomChannel( /// "localhost:50051", grpc::InsecureChannelCredentials(), args)); /// /// A gRPC server can use a custom EventEngine by calling the /// ServerBuilder::SetEventEngine method: /// /// ServerBuilder builder; /// std::shared_ptr engine = std::make_shared(...); /// builder.SetEventEngine(engine); /// std::unique_ptr server(builder.BuildAndStart()); /// server->Wait(); /// /// /// Blocking EventEngine Callbacks /// ------------------------------ /// /// Doing blocking work in EventEngine callbacks is generally not advisable. /// While gRPC's default EventEngine implementations have some capacity to scale /// their thread pools to avoid starvation, this is not an instantaneous /// process. Further, user-provided EventEngines may not be optimized to handle /// excessive blocking work at all. /// /// *Best Practice* : Occasional blocking work may be fine, but we do not /// recommend running a mostly blocking workload in EventEngine threads. /// /// /// Thread-safety guarantees /// ------------------------ /// /// All EventEngine methods are guaranteed to be thread-safe, no external /// synchronization is required to call any EventEngine method. Please note that /// this does not apply to application callbacks, which may be run concurrently; /// application state synchronization must be managed by the application. /// //////////////////////////////////////////////////////////////////////////////// class EventEngine : public std::enable_shared_from_this, public Extensible { public: /// A duration between two events. /// /// Throughout the EventEngine API durations are used to express how long /// until an action should be performed. using Duration = std::chrono::duration; /// A custom closure type for EventEngine task execution. /// /// Throughout the EventEngine API, \a Closure ownership is retained by the /// caller - the EventEngine will never delete a Closure, and upon /// cancellation, the EventEngine will simply forget the Closure exists. The /// caller is responsible for all necessary cleanup. class Closure { public: Closure() = default; // Closure's are an interface, and thus non-copyable. Closure(const Closure&) = delete; Closure& operator=(const Closure&) = delete; // Polymorphic type => virtual destructor virtual ~Closure() = default; // Run the contained code. virtual void Run() = 0; }; /// Represents a scheduled task. /// /// \a TaskHandles are returned by \a Run* methods, and can be given to the /// \a Cancel method. struct TaskHandle { intptr_t keys[2]; static const GRPC_DLL TaskHandle kInvalid; }; /// A handle to a cancellable connection attempt. /// /// Returned by \a Connect, and can be passed to \a CancelConnect. struct ConnectionHandle { intptr_t keys[2]; static const GRPC_DLL ConnectionHandle kInvalid; }; /// Thin wrapper around a platform-specific sockaddr type. A sockaddr struct /// exists on all platforms that gRPC supports. /// /// Platforms are expected to provide definitions for: /// * sockaddr /// * sockaddr_in /// * sockaddr_in6 class ResolvedAddress { public: static constexpr socklen_t MAX_SIZE_BYTES = 128; ResolvedAddress(const sockaddr* address, socklen_t size); ResolvedAddress() = default; ResolvedAddress(const ResolvedAddress&) = default; const struct sockaddr* address() const; socklen_t size() const; private: char address_[MAX_SIZE_BYTES] = {}; socklen_t size_ = 0; }; /// One end of a connection between a gRPC client and server. Endpoints are /// created when connections are established, and Endpoint operations are /// gRPC's primary means of communication. /// /// Endpoints must use the provided MemoryAllocator for all data buffer memory /// allocations. gRPC allows applications to set memory constraints per /// Channel or Server, and the implementation depends on all dynamic memory /// allocation being handled by the quota system. class Endpoint : public Extensible { public: /// Shuts down all connections and invokes all pending read or write /// callbacks with an error status. virtual ~Endpoint() = default; /// A struct representing optional arguments that may be provided to an /// EventEngine Endpoint Read API call. /// /// Passed as argument to an Endpoint \a Read struct ReadArgs { // A suggestion to the endpoint implementation to read at-least the // specified number of bytes over the network connection before marking // the endpoint read operation as complete. gRPC may use this argument // to minimize the number of endpoint read API calls over the lifetime // of a connection. int64_t read_hint_bytes; }; /// Reads data from the Endpoint. /// /// When data is available on the connection, that data is moved into the /// \a buffer. If the read succeeds immediately, it returns true and the \a /// on_read callback is not executed. Otherwise it returns false and the \a /// on_read callback executes asynchronously when the read completes. The /// caller must ensure that the callback has access to the buffer when it /// executes. Ownership of the buffer is not transferred. Valid slices *may* /// be placed into the buffer even if the callback is invoked with a non-OK /// Status. /// /// There can be at most one outstanding read per Endpoint at any given /// time. An outstanding read is one in which the \a on_read callback has /// not yet been executed for some previous call to \a Read. If an attempt /// is made to call \a Read while a previous read is still outstanding, the /// \a EventEngine must abort. /// /// For failed read operations, implementations should pass the appropriate /// statuses to \a on_read. For example, callbacks might expect to receive /// CANCELLED on endpoint shutdown. virtual bool Read(absl::AnyInvocable on_read, SliceBuffer* buffer, const ReadArgs* args) = 0; /// A struct representing optional arguments that may be provided to an /// EventEngine Endpoint Write API call. /// /// Passed as argument to an Endpoint \a Write struct WriteArgs { // Represents private information that may be passed by gRPC for // select endpoints expected to be used only within google. void* google_specific = nullptr; // A suggestion to the endpoint implementation to group data to be written // into frames of the specified max_frame_size. gRPC may use this // argument to dynamically control the max sizes of frames sent to a // receiver in response to high receiver memory pressure. int64_t max_frame_size; }; /// Writes data out on the connection. /// /// If the write succeeds immediately, it returns true and the /// \a on_writable callback is not executed. Otherwise it returns false and /// the \a on_writable callback is called asynchronously when the connection /// is ready for more data. The Slices within the \a data buffer may be /// mutated at will by the Endpoint until \a on_writable is called. The \a /// data SliceBuffer will remain valid after calling \a Write, but its state /// is otherwise undefined. All bytes in \a data must have been written /// before calling \a on_writable unless an error has occurred. /// /// There can be at most one outstanding write per Endpoint at any given /// time. An outstanding write is one in which the \a on_writable callback /// has not yet been executed for some previous call to \a Write. If an /// attempt is made to call \a Write while a previous write is still /// outstanding, the \a EventEngine must abort. /// /// For failed write operations, implementations should pass the appropriate /// statuses to \a on_writable. For example, callbacks might expect to /// receive CANCELLED on endpoint shutdown. virtual bool Write(absl::AnyInvocable on_writable, SliceBuffer* data, const WriteArgs* args) = 0; /// Returns an address in the format described in DNSResolver. The returned /// values are expected to remain valid for the life of the Endpoint. virtual const ResolvedAddress& GetPeerAddress() const = 0; virtual const ResolvedAddress& GetLocalAddress() const = 0; }; /// Called when a new connection is established. /// /// If the connection attempt was not successful, implementations should pass /// the appropriate statuses to this callback. For example, callbacks might /// expect to receive DEADLINE_EXCEEDED statuses when appropriate, or /// CANCELLED statuses on EventEngine shutdown. using OnConnectCallback = absl::AnyInvocable>)>; /// Listens for incoming connection requests from gRPC clients and initiates /// request processing once connections are established. class Listener : public Extensible { public: /// Called when the listener has accepted a new client connection. using AcceptCallback = absl::AnyInvocable, MemoryAllocator memory_allocator)>; virtual ~Listener() = default; /// Bind an address/port to this Listener. /// /// It is expected that multiple addresses/ports can be bound to this /// Listener before Listener::Start has been called. Returns either the /// bound port or an appropriate error status. virtual absl::StatusOr Bind(const ResolvedAddress& addr) = 0; virtual absl::Status Start() = 0; }; /// Factory method to create a network listener / server. /// /// Once a \a Listener is created and started, the \a on_accept callback will /// be called once asynchronously for each established connection. This method /// may return a non-OK status immediately if an error was encountered in any /// synchronous steps required to create the Listener. In this case, /// \a on_shutdown will never be called. /// /// If this method returns a Listener, then \a on_shutdown will be invoked /// exactly once when the Listener is shut down, and only after all /// \a on_accept callbacks have finished executing. The status passed to it /// will indicate if there was a problem during shutdown. /// /// The provided \a MemoryAllocatorFactory is used to create \a /// MemoryAllocators for Endpoint construction. virtual absl::StatusOr> CreateListener( Listener::AcceptCallback on_accept, absl::AnyInvocable on_shutdown, const EndpointConfig& config, std::unique_ptr memory_allocator_factory) = 0; /// Creates a client network connection to a remote network listener. /// /// Even in the event of an error, it is expected that the \a on_connect /// callback will be asynchronously executed exactly once by the EventEngine. /// A connection attempt can be cancelled using the \a CancelConnect method. /// /// Implementation Note: it is important that the \a memory_allocator be used /// for all read/write buffer allocations in the EventEngine implementation. /// This allows gRPC's \a ResourceQuota system to monitor and control memory /// usage with graceful degradation mechanisms. Please see the \a /// MemoryAllocator API for more information. virtual ConnectionHandle Connect(OnConnectCallback on_connect, const ResolvedAddress& addr, const EndpointConfig& args, MemoryAllocator memory_allocator, Duration timeout) = 0; /// Request cancellation of a connection attempt. /// /// If the associated connection has already been completed, it will not be /// cancelled, and this method will return false. /// /// If the associated connection has not been completed, it will be cancelled, /// and this method will return true. The \a OnConnectCallback will not be /// called, and \a on_connect will be destroyed before this method returns. virtual bool CancelConnect(ConnectionHandle handle) = 0; /// Provides asynchronous resolution. /// /// This object has a destruction-is-cancellation semantic. /// Implementations should make sure that all pending requests are cancelled /// when the object is destroyed and all pending callbacks will be called /// shortly. If cancellation races with request completion, implementations /// may choose to either cancel or satisfy the request. class DNSResolver { public: /// Optional configuration for DNSResolvers. struct ResolverOptions { /// If empty, default DNS servers will be used. /// Must be in the "IP:port" format as described in naming.md. std::string dns_server; }; /// DNS SRV record type. struct SRVRecord { std::string host; int port = 0; int priority = 0; int weight = 0; }; /// Called with the collection of sockaddrs that were resolved from a given /// target address. using LookupHostnameCallback = absl::AnyInvocable>)>; /// Called with a collection of SRV records. using LookupSRVCallback = absl::AnyInvocable>)>; /// Called with the result of a TXT record lookup using LookupTXTCallback = absl::AnyInvocable>)>; virtual ~DNSResolver() = default; /// Asynchronously resolve an address. /// /// \a default_port may be a non-numeric named service port, and will only /// be used if \a address does not already contain a port component. /// /// When the lookup is complete or cancelled, the \a on_resolve callback /// will be invoked with a status indicating the success or failure of the /// lookup. Implementations should pass the appropriate statuses to the /// callback. For example, callbacks might expect to receive CANCELLED or /// NOT_FOUND. virtual void LookupHostname(LookupHostnameCallback on_resolve, absl::string_view name, absl::string_view default_port) = 0; /// Asynchronously perform an SRV record lookup. /// /// \a on_resolve has the same meaning and expectations as \a /// LookupHostname's \a on_resolve callback. virtual void LookupSRV(LookupSRVCallback on_resolve, absl::string_view name) = 0; /// Asynchronously perform a TXT record lookup. /// /// \a on_resolve has the same meaning and expectations as \a /// LookupHostname's \a on_resolve callback. virtual void LookupTXT(LookupTXTCallback on_resolve, absl::string_view name) = 0; }; /// At time of destruction, the EventEngine must have no active /// responsibilities. EventEngine users (applications) are responsible for /// cancelling all tasks and DNS lookups, shutting down listeners and /// endpoints, prior to EventEngine destruction. If there are any outstanding /// tasks, any running listeners, etc. at time of EventEngine destruction, /// that is an invalid use of the API, and it will result in undefined /// behavior. virtual ~EventEngine() = default; // TODO(nnoble): consider whether we can remove this method before we // de-experimentalize this API. virtual bool IsWorkerThread() = 0; /// Creates and returns an instance of a DNSResolver, optionally configured by /// the \a options struct. This method may return a non-OK status if an error /// occurred when creating the DNSResolver. If the caller requests a custom /// DNS server, and the EventEngine implementation does not support it, this /// must return an error. virtual absl::StatusOr> GetDNSResolver( const DNSResolver::ResolverOptions& options) = 0; /// Asynchronously executes a task as soon as possible. /// /// \a Closures passed to \a Run cannot be cancelled. The \a closure will not /// be deleted after it has been run, ownership remains with the caller. /// /// Implementations must not execute the closure in the calling thread before /// \a Run returns. For example, if the caller must release a lock before the /// closure can proceed, running the closure immediately would cause a /// deadlock. virtual void Run(Closure* closure) = 0; /// Asynchronously executes a task as soon as possible. /// /// \a Closures passed to \a Run cannot be cancelled. Unlike the overloaded \a /// Closure alternative, the absl::AnyInvocable version's \a closure will be /// deleted by the EventEngine after the closure has been run. /// /// This version of \a Run may be less performant than the \a Closure version /// in some scenarios. This overload is useful in situations where performance /// is not a critical concern. /// /// Implementations must not execute the closure in the calling thread before /// \a Run returns. virtual void Run(absl::AnyInvocable closure) = 0; /// Synonymous with scheduling an alarm to run after duration \a when. /// /// The \a closure will execute when time \a when arrives unless it has been /// cancelled via the \a Cancel method. If cancelled, the closure will not be /// run, nor will it be deleted. Ownership remains with the caller. /// /// Implementations must not execute the closure in the calling thread before /// \a RunAfter returns. /// /// Implementations may return a \a kInvalid handle if the callback can be /// immediately executed, and is therefore not cancellable. virtual TaskHandle RunAfter(Duration when, Closure* closure) = 0; /// Synonymous with scheduling an alarm to run after duration \a when. /// /// The \a closure will execute when time \a when arrives unless it has been /// cancelled via the \a Cancel method. If cancelled, the closure will not be /// run. Unlike the overloaded \a Closure alternative, the absl::AnyInvocable /// version's \a closure will be deleted by the EventEngine after the closure /// has been run, or upon cancellation. /// /// This version of \a RunAfter may be less performant than the \a Closure /// version in some scenarios. This overload is useful in situations where /// performance is not a critical concern. /// /// Implementations must not execute the closure in the calling thread before /// \a RunAfter returns. virtual TaskHandle RunAfter(Duration when, absl::AnyInvocable closure) = 0; /// Request cancellation of a task. /// /// If the associated closure cannot be cancelled for any reason, this /// function will return false. /// /// If the associated closure can be cancelled, the associated callback will /// never be run, and this method will return true. If the callback type was /// an absl::AnyInvocable, it will be destroyed before the method returns. virtual bool Cancel(TaskHandle handle) = 0; }; /// Replace gRPC's default EventEngine factory. /// /// Applications may call \a SetEventEngineFactory at any time to replace the /// default factory used within gRPC. EventEngines will be created when /// necessary, when they are otherwise not provided by the application. /// /// To be certain that none of the gRPC-provided built-in EventEngines are /// created, applications must set a custom EventEngine factory method *before* /// grpc is initialized. void SetEventEngineFactory( absl::AnyInvocable()> factory); /// Reset gRPC's EventEngine factory to the built-in default. /// /// Applications that have called \a SetEventEngineFactory can remove their /// custom factory using this method. The built-in EventEngine factories will be /// used going forward. This has no affect on any EventEngines that were created /// using the previous factories. void EventEngineFactoryReset(); /// Create an EventEngine using the default factory. std::unique_ptr CreateEventEngine(); bool operator==(const EventEngine::TaskHandle& lhs, const EventEngine::TaskHandle& rhs); bool operator!=(const EventEngine::TaskHandle& lhs, const EventEngine::TaskHandle& rhs); std::ostream& operator<<(std::ostream& out, const EventEngine::TaskHandle& handle); bool operator==(const EventEngine::ConnectionHandle& lhs, const EventEngine::ConnectionHandle& rhs); bool operator!=(const EventEngine::ConnectionHandle& lhs, const EventEngine::ConnectionHandle& rhs); std::ostream& operator<<(std::ostream& out, const EventEngine::ConnectionHandle& handle); namespace detail { std::string FormatHandleString(uint64_t key1, uint64_t key2); } template void AbslStringify(Sink& out, const EventEngine::ConnectionHandle& handle) { out.Append(detail::FormatHandleString(handle.keys[0], handle.keys[1])); } template void AbslStringify(Sink& out, const EventEngine::TaskHandle& handle) { out.Append(detail::FormatHandleString(handle.keys[0], handle.keys[1])); } } // namespace experimental } // namespace grpc_event_engine #endif // GRPC_EVENT_ENGINE_EVENT_ENGINE_H