// Copyright 2023 The gRPC Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include #ifdef GPR_APPLE #include "src/core/lib/event_engine/cf_engine/cfstream_endpoint.h" #include "src/core/lib/event_engine/trace.h" #include "src/core/lib/gprpp/strerror.h" namespace grpc_event_engine { namespace experimental { namespace { int kDefaultReadBufferSize = 8192; absl::Status CFErrorToStatus(CFTypeUniqueRef cf_error) { if (cf_error == nullptr) { return absl::OkStatus(); } CFErrorDomain cf_domain = CFErrorGetDomain((cf_error)); CFIndex code = CFErrorGetCode((cf_error)); CFTypeUniqueRef cf_desc = CFErrorCopyDescription((cf_error)); char domain_buf[256]; char desc_buf[256]; CFStringGetCString(cf_domain, domain_buf, 256, kCFStringEncodingUTF8); CFStringGetCString(cf_desc, desc_buf, 256, kCFStringEncodingUTF8); return absl::Status(absl::StatusCode::kUnknown, absl::StrFormat("(domain:%s, code:%ld, description:%s)", domain_buf, code, desc_buf)); } absl::StatusOr CFReadStreamLocallAddress( CFReadStreamRef stream) { CFTypeUniqueRef cf_native_handle = static_cast( CFReadStreamCopyProperty(stream, kCFStreamPropertySocketNativeHandle)); CFSocketNativeHandle socket; CFDataGetBytes(cf_native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*)&socket); EventEngine::ResolvedAddress addr; socklen_t len = EventEngine::ResolvedAddress::MAX_SIZE_BYTES; if (getsockname(socket, const_cast(addr.address()), &len) < 0) { return absl::InternalError( absl::StrCat("getsockname:", grpc_core::StrError(errno))); } return EventEngine::ResolvedAddress(addr.address(), len); } } // namespace bool CFStreamEndpointImpl::CancelConnect(absl::Status status) { GRPC_EVENT_ENGINE_ENDPOINT_TRACE( "CFStreamEndpointImpl::CancelConnect: status: %s, this: %p", status.ToString().c_str(), this); return open_event_.SetShutdown(std::move(status)); } void CFStreamEndpointImpl::Connect( absl::AnyInvocable on_connect, EventEngine::ResolvedAddress addr) { auto addr_uri = ResolvedAddressToURI(addr); if (!addr_uri.ok()) { on_connect(std::move(addr_uri).status()); return; } GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Connect: %s", addr_uri.value().c_str()); peer_address_ = std::move(addr); auto host_port = ResolvedAddressToNormalizedString(peer_address_); if (!host_port.ok()) { on_connect(std::move(host_port).status()); return; } peer_address_string_ = host_port.value(); GRPC_EVENT_ENGINE_ENDPOINT_TRACE( "CFStreamEndpointImpl::Connect, host_port: %s", host_port->c_str()); std::string host_string; std::string port_string; grpc_core::SplitHostPort(host_port.value(), &host_string, &port_string); CFStringRef host = CFStringCreateWithCString(NULL, host_string.c_str(), kCFStringEncodingUTF8); int port = ResolvedAddressGetPort(peer_address_); CFStreamCreatePairWithSocketToHost(NULL, host, port, &cf_read_stream_, &cf_write_stream_); CFStreamClientContext cf_context = {0, this, Retain, Release, nullptr}; CFReadStreamSetClient( cf_read_stream_, kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, ReadCallback, &cf_context); CFWriteStreamSetClient( cf_write_stream_, kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes | kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered, WriteCallback, &cf_context); CFReadStreamSetDispatchQueue(cf_read_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)); CFWriteStreamSetDispatchQueue( cf_write_stream_, dispatch_get_global_queue(QOS_CLASS_DEFAULT, 0)); if (!CFReadStreamOpen(cf_read_stream_)) { auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); on_connect(std::move(status)); return; } if (!CFWriteStreamOpen(cf_write_stream_)) { auto status = CFErrorToStatus(CFWriteStreamCopyError(cf_write_stream_)); on_connect(std::move(status)); return; } open_event_.NotifyOn(new PosixEngineClosure( [that = Ref(), on_connect = std::move(on_connect)](absl::Status status) mutable { if (!status.ok()) { on_connect(std::move(status)); return; } auto local_addr = CFReadStreamLocallAddress(that->cf_read_stream_); if (!local_addr.ok()) { on_connect(std::move(local_addr).status()); return; } that->local_address_ = local_addr.value(); that->local_address_string_ = *ResolvedAddressToURI(that->local_address_); on_connect(absl::OkStatus()); }, false /* is_permanent */)); } /* static */ void CFStreamEndpointImpl::ReadCallback( CFReadStreamRef stream, CFStreamEventType type, void* client_callback_info) { auto self = static_cast(client_callback_info); GRPC_EVENT_ENGINE_ENDPOINT_TRACE( "CFStreamEndpointImpl::ReadCallback, type: %lu, this: %p", type, self); switch (type) { case kCFStreamEventOpenCompleted: // wait for write stream open completed to signal connection ready break; case kCFStreamEventHasBytesAvailable: ABSL_FALLTHROUGH_INTENDED; case kCFStreamEventEndEncountered: self->read_event_.SetReady(); break; case kCFStreamEventErrorOccurred: { auto status = CFErrorToStatus(CFReadStreamCopyError(stream)); GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Read error: %s", status.ToString().c_str()); self->open_event_.SetShutdown(status); self->read_event_.SetShutdown(status); self->write_event_.SetShutdown(status); } break; default: GPR_UNREACHABLE_CODE(return); } } /* static */ void CFStreamEndpointImpl::WriteCallback(CFWriteStreamRef stream, CFStreamEventType type, void* client_callback_info) { auto self = static_cast(client_callback_info); GRPC_EVENT_ENGINE_ENDPOINT_TRACE( "CFStreamEndpointImpl::WriteCallback, type: %lu, this: %p", type, self); switch (type) { case kCFStreamEventOpenCompleted: self->open_event_.SetReady(); break; case kCFStreamEventCanAcceptBytes: ABSL_FALLTHROUGH_INTENDED; case kCFStreamEventEndEncountered: self->write_event_.SetReady(); break; case kCFStreamEventErrorOccurred: { auto status = CFErrorToStatus(CFWriteStreamCopyError(stream)); GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream Write error: %s", status.ToString().c_str()); self->open_event_.SetShutdown(status); self->read_event_.SetShutdown(status); self->write_event_.SetShutdown(status); } break; default: GPR_UNREACHABLE_CODE(return); } } CFStreamEndpointImpl::CFStreamEndpointImpl( std::shared_ptr engine, MemoryAllocator memory_allocator) : engine_(std::move(engine)), memory_allocator_(std::move(memory_allocator)), open_event_(engine_.get()), read_event_(engine_.get()), write_event_(engine_.get()) { open_event_.InitEvent(); read_event_.InitEvent(); write_event_.InitEvent(); } CFStreamEndpointImpl::~CFStreamEndpointImpl() { open_event_.DestroyEvent(); read_event_.DestroyEvent(); write_event_.DestroyEvent(); } void CFStreamEndpointImpl::Shutdown() { GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Shutdown: this: %p", this); auto shutdownStatus = absl::Status(absl::StatusCode::kUnknown, absl::StrFormat("Shutting down CFStreamEndpointImpl")); open_event_.SetShutdown(shutdownStatus); read_event_.SetShutdown(shutdownStatus); write_event_.SetShutdown(shutdownStatus); CFReadStreamSetClient(cf_read_stream_, kCFStreamEventNone, nullptr, nullptr); CFWriteStreamSetClient(cf_write_stream_, kCFStreamEventNone, nullptr, nullptr); CFReadStreamClose(cf_read_stream_); CFWriteStreamClose(cf_write_stream_); } bool CFStreamEndpointImpl::Read( absl::AnyInvocable on_read, SliceBuffer* buffer, const EventEngine::Endpoint::ReadArgs* /* args */) { GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Read, this: %p", this); read_event_.NotifyOn(new PosixEngineClosure( [that = Ref(), on_read = std::move(on_read), buffer](absl::Status status) mutable { if (status.ok()) { that->DoRead(std::move(on_read), buffer); } else { on_read(status); } }, false /* is_permanent*/)); return false; } void CFStreamEndpointImpl::DoRead( absl::AnyInvocable on_read, SliceBuffer* buffer) { GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoRead, this: %p", this); auto buffer_index = buffer->AppendIndexed( Slice(memory_allocator_.MakeSlice(kDefaultReadBufferSize))); CFIndex read_size = CFReadStreamRead( cf_read_stream_, internal::SliceCast(buffer->MutableSliceAt(buffer_index)) .begin(), kDefaultReadBufferSize); if (read_size < 0) { auto status = CFErrorToStatus(CFReadStreamCopyError(cf_read_stream_)); GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStream read error: %s, read_size: %ld", status.ToString().c_str(), read_size); on_read(status); return; } buffer->RemoveLastNBytes(buffer->Length() - read_size); on_read(absl::OkStatus()); } bool CFStreamEndpointImpl::Write( absl::AnyInvocable on_writable, SliceBuffer* data, const EventEngine::Endpoint::WriteArgs* /* args */) { GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::Write, this: %p", this); write_event_.NotifyOn(new PosixEngineClosure( [that = Ref(), on_writable = std::move(on_writable), data](absl::Status status) mutable { if (status.ok()) { that->DoWrite(std::move(on_writable), data); } else { on_writable(status); } }, false /* is_permanent*/)); return false; } void CFStreamEndpointImpl::DoWrite( absl::AnyInvocable on_writable, SliceBuffer* data) { GRPC_EVENT_ENGINE_ENDPOINT_TRACE("CFStreamEndpointImpl::DoWrite, this: %p", this); size_t total_written_size = 0; for (size_t i = 0; i < data->Count(); i++) { auto slice = data->RefSlice(i); size_t written_size = CFWriteStreamWrite(cf_write_stream_, slice.begin(), slice.size()); total_written_size += written_size; if (written_size < slice.size()) { SliceBuffer written; data->MoveFirstNBytesIntoSliceBuffer(total_written_size, written); write_event_.NotifyOn(new PosixEngineClosure( [that = Ref(), on_writable = std::move(on_writable), data](absl::Status status) mutable { if (status.ok()) { that->DoWrite(std::move(on_writable), data); } else { on_writable(status); } }, false /* is_permanent*/)); return; } } on_writable(absl::OkStatus()); } } // namespace experimental } // namespace grpc_event_engine #endif // GPR_APPLE