/* * * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include #include "src/core/lib/iomgr/port.h" #ifdef GRPC_CFSTREAM_ENDPOINT #import #include #include #include #include "src/core/lib/address_utils/sockaddr_utils.h" #include "src/core/lib/gpr/string.h" #include "src/core/lib/iomgr/cfstream_handle.h" #include "src/core/lib/iomgr/closure.h" #include "src/core/lib/iomgr/endpoint.h" #import "src/core/lib/iomgr/endpoint_cfstream.h" #include "src/core/lib/iomgr/error_cfstream.h" #include "src/core/lib/iomgr/sockaddr.h" #include "src/core/lib/slice/slice_internal.h" #include "src/core/lib/slice/slice_string_helpers.h" extern grpc_core::TraceFlag grpc_tcp_trace; struct CFStreamEndpoint { grpc_endpoint base; gpr_refcount refcount; CFReadStreamRef read_stream; CFWriteStreamRef write_stream; CFStreamHandle* stream_sync; grpc_closure* read_cb; grpc_closure* write_cb; grpc_slice_buffer* read_slices; grpc_slice_buffer* write_slices; grpc_closure read_action; grpc_closure write_action; std::string peer_string; std::string local_address; grpc_slice_allocator* slice_allocator; }; static void CFStreamFree(CFStreamEndpoint* ep) { grpc_slice_allocator_destroy(ep->slice_allocator); CFRelease(ep->read_stream); CFRelease(ep->write_stream); CFSTREAM_HANDLE_UNREF(ep->stream_sync, "free"); delete ep; } #ifndef NDEBUG #define EP_REF(ep, reason) CFStreamRef((ep), (reason), __FILE__, __LINE__) #define EP_UNREF(ep, reason) CFStreamUnref((ep), (reason), __FILE__, __LINE__) static void CFStreamUnref(CFStreamEndpoint* ep, const char* reason, const char* file, int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CFStream endpoint unref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val, val - 1); } if (gpr_unref(&ep->refcount)) { CFStreamFree(ep); } } static void CFStreamRef(CFStreamEndpoint* ep, const char* reason, const char* file, int line) { if (grpc_tcp_trace.enabled()) { gpr_atm val = gpr_atm_no_barrier_load(&ep->refcount.count); gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG, "CFStream endpoint ref %p : %s %" PRIdPTR " -> %" PRIdPTR, ep, reason, val, val + 1); } gpr_ref(&ep->refcount); } #else #define EP_REF(ep, reason) CFStreamRef((ep)) #define EP_UNREF(ep, reason) CFStreamUnref((ep)) static void CFStreamUnref(CFStreamEndpoint* ep) { if (gpr_unref(&ep->refcount)) { CFStreamFree(ep); } } static void CFStreamRef(CFStreamEndpoint* ep) { gpr_ref(&ep->refcount); } #endif static grpc_error_handle CFStreamAnnotateError(grpc_error_handle src_error, CFStreamEndpoint* ep) { return grpc_error_set_str( grpc_error_set_int(src_error, GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE), GRPC_ERROR_STR_TARGET_ADDRESS, ep->peer_string); } static void CallReadCb(CFStreamEndpoint* ep, grpc_error_handle error) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_read_cb %p %p:%p", ep, ep->read_cb, ep->read_cb->cb, ep->read_cb->cb_arg); size_t i; gpr_log(GPR_DEBUG, "read: error=%s", grpc_error_std_string(error).c_str()); for (i = 0; i < ep->read_slices->count; i++) { char* dump = grpc_dump_slice(ep->read_slices->slices[i], GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "READ %p (peer=%s): %s", ep, ep->peer_string.c_str(), dump); gpr_free(dump); } } grpc_closure* cb = ep->read_cb; ep->read_cb = nullptr; ep->read_slices = nullptr; grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void CallWriteCb(CFStreamEndpoint* ep, grpc_error_handle error) { if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p call_write_cb %p %p:%p", ep, ep->write_cb, ep->write_cb->cb, ep->write_cb->cb_arg); gpr_log(GPR_DEBUG, "write: error=%s", grpc_error_std_string(error).c_str()); } grpc_closure* cb = ep->write_cb; ep->write_cb = nullptr; ep->write_slices = nullptr; grpc_core::ExecCtx::Run(DEBUG_LOCATION, cb, error); } static void ReadAction(void* arg, grpc_error_handle error) { CFStreamEndpoint* ep = static_cast(arg); GPR_ASSERT(ep->read_cb != nullptr); if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CallReadCb(ep, GRPC_ERROR_REF(error)); EP_UNREF(ep, "read"); return; } GPR_ASSERT(ep->read_slices->count == 1); grpc_slice slice = ep->read_slices->slices[0]; size_t len = GRPC_SLICE_LENGTH(slice); CFIndex read_size = CFReadStreamRead(ep->read_stream, GRPC_SLICE_START_PTR(slice), len); if (read_size == -1) { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CFErrorRef stream_error = CFReadStreamCopyError(ep->read_stream); if (stream_error != nullptr) { error = CFStreamAnnotateError( GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "Read error"), ep); CFRelease(stream_error); } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Read error"); } CallReadCb(ep, error); EP_UNREF(ep, "read"); } else if (read_size == 0) { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CallReadCb(ep, CFStreamAnnotateError( GRPC_ERROR_CREATE_FROM_STATIC_STRING("Socket closed"), ep)); EP_UNREF(ep, "read"); } else { if (read_size < static_cast(len)) { grpc_slice_buffer_trim_end(ep->read_slices, len - read_size, nullptr); } CallReadCb(ep, GRPC_ERROR_NONE); EP_UNREF(ep, "read"); } } static void WriteAction(void* arg, grpc_error_handle error) { CFStreamEndpoint* ep = static_cast(arg); GPR_ASSERT(ep->write_cb != nullptr); if (error != GRPC_ERROR_NONE) { grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); CallWriteCb(ep, GRPC_ERROR_REF(error)); EP_UNREF(ep, "write"); return; } grpc_slice slice = grpc_slice_buffer_take_first(ep->write_slices); size_t slice_len = GRPC_SLICE_LENGTH(slice); CFIndex write_size = CFWriteStreamWrite( ep->write_stream, GRPC_SLICE_START_PTR(slice), slice_len); if (write_size == -1) { grpc_slice_buffer_reset_and_unref_internal(ep->write_slices); CFErrorRef stream_error = CFWriteStreamCopyError(ep->write_stream); if (stream_error != nullptr) { error = CFStreamAnnotateError( GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write failed."), ep); CFRelease(stream_error); } else { error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("write failed."); } CallWriteCb(ep, error); EP_UNREF(ep, "write"); } else { if (write_size < static_cast(GRPC_SLICE_LENGTH(slice))) { grpc_slice_buffer_undo_take_first( ep->write_slices, grpc_slice_sub(slice, write_size, slice_len)); } if (ep->write_slices->length > 0) { ep->stream_sync->NotifyOnWrite(&ep->write_action); } else { CallWriteCb(ep, GRPC_ERROR_NONE); EP_UNREF(ep, "write"); } if (grpc_tcp_trace.enabled()) { grpc_slice trace_slice = grpc_slice_sub(slice, 0, write_size); char* dump = grpc_dump_slice(trace_slice, GPR_DUMP_HEX | GPR_DUMP_ASCII); gpr_log(GPR_DEBUG, "WRITE %p (peer=%s): %s", ep, ep->peer_string.c_str(), dump); gpr_free(dump); grpc_slice_unref_internal(trace_slice); } } grpc_slice_unref_internal(slice); } static void CFStreamReadAllocationDone(void* arg, grpc_error_handle error) { CFStreamEndpoint* ep = static_cast(arg); if (error == GRPC_ERROR_NONE) { ep->stream_sync->NotifyOnRead(&ep->read_action); } else { grpc_slice_buffer_reset_and_unref_internal(ep->read_slices); CallReadCb(ep, error); EP_UNREF(ep, "read"); } } static void CFStreamRead(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, bool urgent) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p read (%p, %p) length:%zu", ep_impl, slices, cb, slices->length); } GPR_ASSERT(ep_impl->read_cb == nullptr); ep_impl->read_cb = cb; ep_impl->read_slices = slices; grpc_slice_buffer_reset_and_unref_internal(slices); EP_REF(ep_impl, "read"); if (grpc_slice_allocator_allocate( ep_impl->slice_allocator, GRPC_TCP_DEFAULT_READ_SLICE_SIZE, 1, grpc_slice_allocator_intent::kReadBuffer, ep_impl->read_slices, CFStreamReadAllocationDone, ep_impl)) { ep_impl->stream_sync->NotifyOnRead(&ep_impl->read_action); } } static void CFStreamWrite(grpc_endpoint* ep, grpc_slice_buffer* slices, grpc_closure* cb, void* arg) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p write (%p, %p) length:%zu", ep_impl, slices, cb, slices->length); } GPR_ASSERT(ep_impl->write_cb == nullptr); ep_impl->write_cb = cb; ep_impl->write_slices = slices; EP_REF(ep_impl, "write"); ep_impl->stream_sync->NotifyOnWrite(&ep_impl->write_action); } void CFStreamShutdown(grpc_endpoint* ep, grpc_error_handle why) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown (%s)", ep_impl, grpc_error_std_string(why).c_str()); } CFReadStreamClose(ep_impl->read_stream); CFWriteStreamClose(ep_impl->write_stream); ep_impl->stream_sync->Shutdown(why); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p shutdown DONE (%s)", ep_impl, grpc_error_std_string(why).c_str()); } } void CFStreamDestroy(grpc_endpoint* ep) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p destroy", ep_impl); } EP_UNREF(ep_impl, "destroy"); } absl::string_view CFStreamGetPeer(grpc_endpoint* ep) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); return ep_impl->peer_string; } absl::string_view CFStreamGetLocalAddress(grpc_endpoint* ep) { CFStreamEndpoint* ep_impl = reinterpret_cast(ep); return ep_impl->local_address; } int CFStreamGetFD(grpc_endpoint* ep) { return 0; } bool CFStreamCanTrackErr(grpc_endpoint* ep) { return false; } void CFStreamAddToPollset(grpc_endpoint* ep, grpc_pollset* pollset) {} void CFStreamAddToPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} void CFStreamDeleteFromPollsetSet(grpc_endpoint* ep, grpc_pollset_set* pollset) {} static const grpc_endpoint_vtable vtable = {CFStreamRead, CFStreamWrite, CFStreamAddToPollset, CFStreamAddToPollsetSet, CFStreamDeleteFromPollsetSet, CFStreamShutdown, CFStreamDestroy, CFStreamGetPeer, CFStreamGetLocalAddress, CFStreamGetFD, CFStreamCanTrackErr}; grpc_endpoint* grpc_cfstream_endpoint_create( CFReadStreamRef read_stream, CFWriteStreamRef write_stream, const char* peer_string, grpc_slice_allocator* slice_allocator, CFStreamHandle* stream_sync) { CFStreamEndpoint* ep_impl = new CFStreamEndpoint; if (grpc_tcp_trace.enabled()) { gpr_log(GPR_DEBUG, "CFStream endpoint:%p create readStream:%p writeStream: %p", ep_impl, read_stream, write_stream); } ep_impl->base.vtable = &vtable; gpr_ref_init(&ep_impl->refcount, 1); ep_impl->read_stream = read_stream; ep_impl->write_stream = write_stream; CFRetain(read_stream); CFRetain(write_stream); ep_impl->stream_sync = stream_sync; CFSTREAM_HANDLE_REF(ep_impl->stream_sync, "endpoint create"); ep_impl->peer_string = peer_string; grpc_resolved_address resolved_local_addr; resolved_local_addr.len = sizeof(resolved_local_addr.addr); CFDataRef native_handle = static_cast(CFReadStreamCopyProperty( ep_impl->read_stream, kCFStreamPropertySocketNativeHandle)); CFSocketNativeHandle sockfd; CFDataGetBytes(native_handle, CFRangeMake(0, sizeof(CFSocketNativeHandle)), (UInt8*)&sockfd); if (native_handle) { CFRelease(native_handle); } if (getsockname(sockfd, reinterpret_cast(resolved_local_addr.addr), &resolved_local_addr.len) < 0) { ep_impl->local_address = ""; } else { ep_impl->local_address = grpc_sockaddr_to_uri(&resolved_local_addr); } ep_impl->read_cb = nil; ep_impl->write_cb = nil; ep_impl->read_slices = nil; ep_impl->write_slices = nil; GRPC_CLOSURE_INIT(&ep_impl->read_action, ReadAction, static_cast(ep_impl), grpc_schedule_on_exec_ctx); GRPC_CLOSURE_INIT(&ep_impl->write_action, WriteAction, static_cast(ep_impl), grpc_schedule_on_exec_ctx); ep_impl->slice_allocator = slice_allocator; return &ep_impl->base; } #endif /* GRPC_CFSTREAM_ENDPOINT */