ext/libuv/src/win/pipe.c in libuv-0.11.21 vs ext/libuv/src/win/pipe.c in libuv-0.11.22
- old
+ new
@@ -1,1747 +1,1747 @@
-/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
-
-#include <assert.h>
-#include <io.h>
-#include <string.h>
-#include <stdio.h>
-#include <stdlib.h>
-
-#include "uv.h"
-#include "internal.h"
-#include "handle-inl.h"
-#include "stream-inl.h"
-#include "req-inl.h"
-
-
-/* A zero-size buffer for use by uv_pipe_read */
-static char uv_zero_[] = "";
-
-/* Null uv_buf_t */
-static const uv_buf_t uv_null_buf_ = { 0, NULL };
-
-/* The timeout that the pipe will wait for the remote end to write data */
-/* when the local ends wants to shut it down. */
-static const int64_t eof_timeout = 50; /* ms */
-
-static const int default_pending_pipe_instances = 4;
-
-/* IPC protocol flags. */
-#define UV_IPC_RAW_DATA 0x0001
-#define UV_IPC_TCP_SERVER 0x0002
-#define UV_IPC_TCP_CONNECTION 0x0004
-
-/* IPC frame header. */
-typedef struct {
- int flags;
- uint64_t raw_data_length;
-} uv_ipc_frame_header_t;
-
-/* IPC frame, which contains an imported TCP socket stream. */
-typedef struct {
- uv_ipc_frame_header_t header;
- WSAPROTOCOL_INFOW socket_info;
-} uv_ipc_frame_uv_stream;
-
-static void eof_timer_init(uv_pipe_t* pipe);
-static void eof_timer_start(uv_pipe_t* pipe);
-static void eof_timer_stop(uv_pipe_t* pipe);
-static void eof_timer_cb(uv_timer_t* timer, int status);
-static void eof_timer_destroy(uv_pipe_t* pipe);
-static void eof_timer_close_cb(uv_handle_t* handle);
-
-
-static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
- _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%u", ptr, GetCurrentProcessId());
-}
-
-
-int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
- uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
-
- handle->reqs_pending = 0;
- handle->handle = INVALID_HANDLE_VALUE;
- handle->name = NULL;
- handle->ipc_pid = 0;
- handle->remaining_ipc_rawdata_bytes = 0;
- handle->pending_ipc_info.socket_info = NULL;
- handle->pending_ipc_info.tcp_connection = 0;
- handle->ipc = ipc;
- handle->non_overlapped_writes_tail = NULL;
-
- uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
-
- return 0;
-}
-
-
-static void uv_pipe_connection_init(uv_pipe_t* handle) {
- uv_connection_init((uv_stream_t*) handle);
- handle->read_req.data = handle;
- handle->eof_timer = NULL;
-}
-
-
-static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
- HANDLE pipeHandle;
-
- /*
- * Assume that we have a duplex pipe first, so attempt to
- * connect with GENERIC_READ | GENERIC_WRITE.
- */
- pipeHandle = CreateFileW(name,
- GENERIC_READ | GENERIC_WRITE,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
- return pipeHandle;
- }
-
- /*
- * If the pipe is not duplex CreateFileW fails with
- * ERROR_ACCESS_DENIED. In that case try to connect
- * as a read-only or write-only.
- */
- if (GetLastError() == ERROR_ACCESS_DENIED) {
- pipeHandle = CreateFileW(name,
- GENERIC_READ | FILE_WRITE_ATTRIBUTES,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- *duplex_flags = UV_HANDLE_READABLE;
- return pipeHandle;
- }
- }
-
- if (GetLastError() == ERROR_ACCESS_DENIED) {
- pipeHandle = CreateFileW(name,
- GENERIC_WRITE | FILE_READ_ATTRIBUTES,
- 0,
- NULL,
- OPEN_EXISTING,
- FILE_FLAG_OVERLAPPED,
- NULL);
-
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- *duplex_flags = UV_HANDLE_WRITABLE;
- return pipeHandle;
- }
- }
-
- return INVALID_HANDLE_VALUE;
-}
-
-
-int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
- char* name, size_t nameSize) {
- HANDLE pipeHandle;
- int err;
- char* ptr = (char*)handle;
-
- for (;;) {
- uv_unique_pipe_name(ptr, name, nameSize);
-
- pipeHandle = CreateNamedPipeA(name,
- access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
- NULL);
-
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- /* No name collisions. We're done. */
- break;
- }
-
- err = GetLastError();
- if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
- goto error;
- }
-
- /* Pipe name collision. Increment the pointer and try again. */
- ptr++;
- }
-
- if (CreateIoCompletionPort(pipeHandle,
- loop->iocp,
- (ULONG_PTR)handle,
- 0) == NULL) {
- err = GetLastError();
- goto error;
- }
-
- uv_pipe_connection_init(handle);
- handle->handle = pipeHandle;
-
- return 0;
-
- error:
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- CloseHandle(pipeHandle);
- }
-
- return err;
-}
-
-
-static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
- HANDLE pipeHandle, DWORD duplex_flags) {
- NTSTATUS nt_status;
- IO_STATUS_BLOCK io_status;
- FILE_MODE_INFORMATION mode_info;
- DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
-
- if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
- /* If this returns ERROR_INVALID_PARAMETER we probably opened something */
- /* that is not a pipe. */
- if (GetLastError() == ERROR_INVALID_PARAMETER) {
- SetLastError(WSAENOTSOCK);
- }
- return -1;
- }
-
- /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
- nt_status = pNtQueryInformationFile(pipeHandle,
- &io_status,
- &mode_info,
- sizeof(mode_info),
- FileModeInformation);
- if (nt_status != STATUS_SUCCESS) {
- return -1;
- }
-
- if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
- mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
- /* Non-overlapped pipe. */
- handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
- } else {
- /* Overlapped pipe. Try to associate with IOCP. */
- if (CreateIoCompletionPort(pipeHandle,
- loop->iocp,
- (ULONG_PTR)handle,
- 0) == NULL) {
- handle->flags |= UV_HANDLE_EMULATE_IOCP;
- }
- }
-
- handle->handle = pipeHandle;
- handle->flags |= duplex_flags;
-
- return 0;
-}
-
-
-static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
- uv_loop_t* loop;
- uv_pipe_t* handle;
- uv_shutdown_t* req;
-
- req = (uv_shutdown_t*) parameter;
- assert(req);
- handle = (uv_pipe_t*) req->handle;
- assert(handle);
- loop = handle->loop;
- assert(loop);
-
- FlushFileBuffers(handle->handle);
-
- /* Post completed */
- POST_COMPLETION_FOR_REQ(loop, req);
-
- return 0;
-}
-
-
-void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
- int err;
- DWORD result;
- uv_shutdown_t* req;
- NTSTATUS nt_status;
- IO_STATUS_BLOCK io_status;
- FILE_PIPE_LOCAL_INFORMATION pipe_info;
-
- if ((handle->flags & UV_HANDLE_CONNECTION) &&
- handle->shutdown_req != NULL &&
- handle->write_reqs_pending == 0) {
- req = handle->shutdown_req;
-
- /* Clear the shutdown_req field so we don't go here again. */
- handle->shutdown_req = NULL;
-
- if (handle->flags & UV__HANDLE_CLOSING) {
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- /* Already closing. Cancel the shutdown. */
- if (req->cb) {
- req->cb(req, UV_ECANCELED);
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
-
- /* Try to avoid flushing the pipe buffer in the thread pool. */
- nt_status = pNtQueryInformationFile(handle->handle,
- &io_status,
- &pipe_info,
- sizeof pipe_info,
- FilePipeLocalInformation);
-
- if (nt_status != STATUS_SUCCESS) {
- /* Failure */
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
- if (req->cb) {
- err = pRtlNtStatusToDosError(nt_status);
- req->cb(req, uv_translate_sys_error(err));
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
-
- if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
- /* Short-circuit, no need to call FlushFileBuffers. */
- uv_insert_pending_req(loop, (uv_req_t*) req);
- return;
- }
-
- /* Run FlushFileBuffers in the thread pool. */
- result = QueueUserWorkItem(pipe_shutdown_thread_proc,
- req,
- WT_EXECUTELONGFUNCTION);
- if (result) {
- return;
-
- } else {
- /* Failure. */
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
- if (req->cb) {
- err = GetLastError();
- req->cb(req, uv_translate_sys_error(err));
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
- }
-
- if (handle->flags & UV__HANDLE_CLOSING &&
- handle->reqs_pending == 0) {
- assert(!(handle->flags & UV_HANDLE_CLOSED));
-
- if (handle->flags & UV_HANDLE_CONNECTION) {
- if (handle->pending_ipc_info.socket_info) {
- free(handle->pending_ipc_info.socket_info);
- handle->pending_ipc_info.socket_info = NULL;
- }
-
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
- UnregisterWait(handle->read_req.wait_handle);
- handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
- }
- if (handle->read_req.event_handle) {
- CloseHandle(handle->read_req.event_handle);
- handle->read_req.event_handle = NULL;
- }
- }
- }
-
- if (handle->flags & UV_HANDLE_PIPESERVER) {
- assert(handle->accept_reqs);
- free(handle->accept_reqs);
- handle->accept_reqs = NULL;
- }
-
- uv__handle_close(handle);
- }
-}
-
-
-void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
- handle->pending_instances = count;
- handle->flags |= UV_HANDLE_PIPESERVER;
-}
-
-
-/* Creates a pipe server. */
-int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
- uv_loop_t* loop = handle->loop;
- int i, err, nameSize;
- uv_pipe_accept_t* req;
-
- if (handle->flags & UV_HANDLE_BOUND) {
- return UV_EINVAL;
- }
-
- if (!name) {
- return UV_EINVAL;
- }
-
- if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
- handle->pending_instances = default_pending_pipe_instances;
- }
-
- handle->accept_reqs = (uv_pipe_accept_t*)
- malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
- if (!handle->accept_reqs) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
-
- for (i = 0; i < handle->pending_instances; i++) {
- req = &handle->accept_reqs[i];
- uv_req_init(loop, (uv_req_t*) req);
- req->type = UV_ACCEPT;
- req->data = handle;
- req->pipeHandle = INVALID_HANDLE_VALUE;
- req->next_pending = NULL;
- }
-
- /* Convert name to UTF16. */
- nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
- handle->name = (WCHAR*)malloc(nameSize);
- if (!handle->name) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
-
- if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
- return uv_translate_sys_error(GetLastError());
- }
-
- /*
- * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
- * If this fails then there's already a pipe server for the given pipe name.
- */
- handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
- FILE_FLAG_FIRST_PIPE_INSTANCE,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
- PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
-
- if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
- err = GetLastError();
- if (err == ERROR_ACCESS_DENIED) {
- err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
- } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
- err = WSAEACCES; /* Translates to UV_EACCES. */
- }
- goto error;
- }
-
- if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
- err = GetLastError();
- goto error;
- }
-
- handle->pending_accepts = NULL;
- handle->flags |= UV_HANDLE_PIPESERVER;
- handle->flags |= UV_HANDLE_BOUND;
-
- return 0;
-
-error:
- if (handle->name) {
- free(handle->name);
- handle->name = NULL;
- }
-
- if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
- CloseHandle(handle->accept_reqs[0].pipeHandle);
- handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
- }
-
- return uv_translate_sys_error(err);
-}
-
-
-static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
- uv_loop_t* loop;
- uv_pipe_t* handle;
- uv_connect_t* req;
- HANDLE pipeHandle = INVALID_HANDLE_VALUE;
- DWORD duplex_flags;
-
- req = (uv_connect_t*) parameter;
- assert(req);
- handle = (uv_pipe_t*) req->handle;
- assert(handle);
- loop = handle->loop;
- assert(loop);
-
- /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
- /* We wait for the pipe to become available with WaitNamedPipe. */
- while (WaitNamedPipeW(handle->name, 30000)) {
- /* The pipe is now available, try to connect. */
- pipeHandle = open_named_pipe(handle->name, &duplex_flags);
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- break;
- }
-
- SwitchToThread();
- }
-
- if (pipeHandle != INVALID_HANDLE_VALUE &&
- !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
- SET_REQ_SUCCESS(req);
- } else {
- SET_REQ_ERROR(req, GetLastError());
- }
-
- /* Post completed */
- POST_COMPLETION_FOR_REQ(loop, req);
-
- return 0;
-}
-
-
-void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
- const char* name, uv_connect_cb cb) {
- uv_loop_t* loop = handle->loop;
- int err, nameSize;
- HANDLE pipeHandle = INVALID_HANDLE_VALUE;
- DWORD duplex_flags;
-
- uv_req_init(loop, (uv_req_t*) req);
- req->type = UV_CONNECT;
- req->handle = (uv_stream_t*) handle;
- req->cb = cb;
-
- /* Convert name to UTF16. */
- nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
- handle->name = (WCHAR*)malloc(nameSize);
- if (!handle->name) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
-
- if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
- err = GetLastError();
- goto error;
- }
-
- pipeHandle = open_named_pipe(handle->name, &duplex_flags);
- if (pipeHandle == INVALID_HANDLE_VALUE) {
- if (GetLastError() == ERROR_PIPE_BUSY) {
- /* Wait for the server to make a pipe instance available. */
- if (!QueueUserWorkItem(&pipe_connect_thread_proc,
- req,
- WT_EXECUTELONGFUNCTION)) {
- err = GetLastError();
- goto error;
- }
-
- REGISTER_HANDLE_REQ(loop, handle, req);
- handle->reqs_pending++;
-
- return;
- }
-
- err = GetLastError();
- goto error;
- }
-
- assert(pipeHandle != INVALID_HANDLE_VALUE);
-
- if (uv_set_pipe_handle(loop,
- (uv_pipe_t*) req->handle,
- pipeHandle,
- duplex_flags)) {
- err = GetLastError();
- goto error;
- }
-
- SET_REQ_SUCCESS(req);
- uv_insert_pending_req(loop, (uv_req_t*) req);
- handle->reqs_pending++;
- REGISTER_HANDLE_REQ(loop, handle, req);
- return;
-
-error:
- if (handle->name) {
- free(handle->name);
- handle->name = NULL;
- }
-
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- CloseHandle(pipeHandle);
- }
-
- /* Make this req pending reporting an error. */
- SET_REQ_ERROR(req, err);
- uv_insert_pending_req(loop, (uv_req_t*) req);
- handle->reqs_pending++;
- REGISTER_HANDLE_REQ(loop, handle, req);
- return;
-}
-
-
-/* Cleans up uv_pipe_t (server or connection) and all resources associated */
-/* with it. */
-void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
- int i;
- HANDLE pipeHandle;
-
- if (handle->name) {
- free(handle->name);
- handle->name = NULL;
- }
-
- if (handle->flags & UV_HANDLE_PIPESERVER) {
- for (i = 0; i < handle->pending_instances; i++) {
- pipeHandle = handle->accept_reqs[i].pipeHandle;
- if (pipeHandle != INVALID_HANDLE_VALUE) {
- CloseHandle(pipeHandle);
- handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
- }
- }
- }
-
- if (handle->flags & UV_HANDLE_CONNECTION) {
- handle->flags &= ~UV_HANDLE_WRITABLE;
- eof_timer_destroy(handle);
- }
-
- if ((handle->flags & UV_HANDLE_CONNECTION)
- && handle->handle != INVALID_HANDLE_VALUE) {
- CloseHandle(handle->handle);
- handle->handle = INVALID_HANDLE_VALUE;
- }
-}
-
-
-void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
- if (handle->flags & UV_HANDLE_READING) {
- handle->flags &= ~UV_HANDLE_READING;
- DECREASE_ACTIVE_COUNT(loop, handle);
- }
-
- if (handle->flags & UV_HANDLE_LISTENING) {
- handle->flags &= ~UV_HANDLE_LISTENING;
- DECREASE_ACTIVE_COUNT(loop, handle);
- }
-
- uv_pipe_cleanup(loop, handle);
-
- if (handle->reqs_pending == 0) {
- uv_want_endgame(loop, (uv_handle_t*) handle);
- }
-
- handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
- uv__handle_closing(handle);
-}
-
-
-static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
- uv_pipe_accept_t* req, BOOL firstInstance) {
- assert(handle->flags & UV_HANDLE_LISTENING);
-
- if (!firstInstance) {
- assert(req->pipeHandle == INVALID_HANDLE_VALUE);
-
- req->pipeHandle = CreateNamedPipeW(handle->name,
- PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
- PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
- PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
-
- if (req->pipeHandle == INVALID_HANDLE_VALUE) {
- SET_REQ_ERROR(req, GetLastError());
- uv_insert_pending_req(loop, (uv_req_t*) req);
- handle->reqs_pending++;
- return;
- }
-
- if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
- CloseHandle(req->pipeHandle);
- req->pipeHandle = INVALID_HANDLE_VALUE;
- SET_REQ_ERROR(req, GetLastError());
- uv_insert_pending_req(loop, (uv_req_t*) req);
- handle->reqs_pending++;
- return;
- }
- }
-
- assert(req->pipeHandle != INVALID_HANDLE_VALUE);
-
- /* Prepare the overlapped structure. */
- memset(&(req->overlapped), 0, sizeof(req->overlapped));
-
- if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
- GetLastError() != ERROR_IO_PENDING) {
- if (GetLastError() == ERROR_PIPE_CONNECTED) {
- SET_REQ_SUCCESS(req);
- } else {
- CloseHandle(req->pipeHandle);
- req->pipeHandle = INVALID_HANDLE_VALUE;
- /* Make this req pending reporting an error. */
- SET_REQ_ERROR(req, GetLastError());
- }
- uv_insert_pending_req(loop, (uv_req_t*) req);
- handle->reqs_pending++;
- return;
- }
-
- handle->reqs_pending++;
-}
-
-
-int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
- uv_loop_t* loop = server->loop;
- uv_pipe_t* pipe_client;
- uv_pipe_accept_t* req;
-
- if (server->ipc) {
- if (!server->pending_ipc_info.socket_info) {
- /* No valid pending sockets. */
- return WSAEWOULDBLOCK;
- }
-
- return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
- server->pending_ipc_info.tcp_connection);
- } else {
- pipe_client = (uv_pipe_t*)client;
-
- /* Find a connection instance that has been connected, but not yet */
- /* accepted. */
- req = server->pending_accepts;
-
- if (!req) {
- /* No valid connections found, so we error out. */
- return WSAEWOULDBLOCK;
- }
-
- /* Initialize the client handle and copy the pipeHandle to the client */
- uv_pipe_connection_init(pipe_client);
- pipe_client->handle = req->pipeHandle;
- pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
-
- /* Prepare the req to pick up a new connection */
- server->pending_accepts = req->next_pending;
- req->next_pending = NULL;
- req->pipeHandle = INVALID_HANDLE_VALUE;
-
- if (!(server->flags & UV__HANDLE_CLOSING)) {
- uv_pipe_queue_accept(loop, server, req, FALSE);
- }
- }
-
- return 0;
-}
-
-
-/* Starts listening for connections for the given pipe. */
-int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
- uv_loop_t* loop = handle->loop;
- int i;
-
- if (handle->flags & UV_HANDLE_LISTENING) {
- handle->connection_cb = cb;
- }
-
- if (!(handle->flags & UV_HANDLE_BOUND)) {
- return WSAEINVAL;
- }
-
- if (handle->flags & UV_HANDLE_READING) {
- return WSAEISCONN;
- }
-
- if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
- return ERROR_NOT_SUPPORTED;
- }
-
- handle->flags |= UV_HANDLE_LISTENING;
- INCREASE_ACTIVE_COUNT(loop, handle);
- handle->connection_cb = cb;
-
- /* First pipe handle should have already been created in uv_pipe_bind */
- assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
-
- for (i = 0; i < handle->pending_instances; i++) {
- uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
- }
-
- return 0;
-}
-
-
-static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
- int result;
- DWORD bytes;
- uv_read_t* req = (uv_read_t*) parameter;
- uv_pipe_t* handle = (uv_pipe_t*) req->data;
- uv_loop_t* loop = handle->loop;
-
- assert(req != NULL);
- assert(req->type == UV_READ);
- assert(handle->type == UV_NAMED_PIPE);
-
- result = ReadFile(handle->handle,
- &uv_zero_,
- 0,
- &bytes,
- NULL);
-
- if (!result) {
- SET_REQ_ERROR(req, GetLastError());
- }
-
- POST_COMPLETION_FOR_REQ(loop, req);
- return 0;
-}
-
-
-static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
- int result;
- DWORD bytes;
- uv_write_t* req = (uv_write_t*) parameter;
- uv_pipe_t* handle = (uv_pipe_t*) req->handle;
- uv_loop_t* loop = handle->loop;
-
- assert(req != NULL);
- assert(req->type == UV_WRITE);
- assert(handle->type == UV_NAMED_PIPE);
- assert(req->write_buffer.base);
-
- result = WriteFile(handle->handle,
- req->write_buffer.base,
- req->write_buffer.len,
- &bytes,
- NULL);
-
- if (!result) {
- SET_REQ_ERROR(req, GetLastError());
- }
-
- POST_COMPLETION_FOR_REQ(loop, req);
- return 0;
-}
-
-
-static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
- uv_read_t* req;
- uv_tcp_t* handle;
-
- req = (uv_read_t*) context;
- assert(req != NULL);
- handle = (uv_tcp_t*)req->data;
- assert(handle != NULL);
- assert(!timed_out);
-
- if (!PostQueuedCompletionStatus(handle->loop->iocp,
- req->overlapped.InternalHigh,
- 0,
- &req->overlapped)) {
- uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
- }
-}
-
-
-static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
- uv_write_t* req;
- uv_tcp_t* handle;
-
- req = (uv_write_t*) context;
- assert(req != NULL);
- handle = (uv_tcp_t*)req->handle;
- assert(handle != NULL);
- assert(!timed_out);
-
- if (!PostQueuedCompletionStatus(handle->loop->iocp,
- req->overlapped.InternalHigh,
- 0,
- &req->overlapped)) {
- uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
- }
-}
-
-
-static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
- uv_read_t* req;
- int result;
-
- assert(handle->flags & UV_HANDLE_READING);
- assert(!(handle->flags & UV_HANDLE_READ_PENDING));
-
- assert(handle->handle != INVALID_HANDLE_VALUE);
-
- req = &handle->read_req;
-
- if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
- if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
- req,
- WT_EXECUTELONGFUNCTION)) {
- /* Make this req pending reporting an error. */
- SET_REQ_ERROR(req, GetLastError());
- goto error;
- }
- } else {
- memset(&req->overlapped, 0, sizeof(req->overlapped));
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
- }
-
- /* Do 0-read */
- result = ReadFile(handle->handle,
- &uv_zero_,
- 0,
- NULL,
- &req->overlapped);
-
- if (!result && GetLastError() != ERROR_IO_PENDING) {
- /* Make this req pending reporting an error. */
- SET_REQ_ERROR(req, GetLastError());
- goto error;
- }
-
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (!req->event_handle) {
- req->event_handle = CreateEvent(NULL, 0, 0, NULL);
- if (!req->event_handle) {
- uv_fatal_error(GetLastError(), "CreateEvent");
- }
- }
- if (req->wait_handle == INVALID_HANDLE_VALUE) {
- if (!RegisterWaitForSingleObject(&req->wait_handle,
- req->overlapped.hEvent, post_completion_read_wait, (void*) req,
- INFINITE, WT_EXECUTEINWAITTHREAD)) {
- SET_REQ_ERROR(req, GetLastError());
- goto error;
- }
- }
- }
- }
-
- /* Start the eof timer if there is one */
- eof_timer_start(handle);
- handle->flags |= UV_HANDLE_READ_PENDING;
- handle->reqs_pending++;
- return;
-
-error:
- uv_insert_pending_req(loop, (uv_req_t*)req);
- handle->flags |= UV_HANDLE_READ_PENDING;
- handle->reqs_pending++;
-}
-
-
-static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
- uv_read_cb read_cb, uv_read2_cb read2_cb) {
- uv_loop_t* loop = handle->loop;
-
- handle->flags |= UV_HANDLE_READING;
- INCREASE_ACTIVE_COUNT(loop, handle);
- handle->read_cb = read_cb;
- handle->read2_cb = read2_cb;
- handle->alloc_cb = alloc_cb;
-
- /* If reading was stopped and then started again, there could still be a */
- /* read request pending. */
- if (!(handle->flags & UV_HANDLE_READ_PENDING))
- uv_pipe_queue_read(loop, handle);
-
- return 0;
-}
-
-
-int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
- uv_read_cb read_cb) {
- return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
-}
-
-
-int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
- uv_read2_cb read_cb) {
- return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
-}
-
-
-static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
- uv_write_t* req) {
- req->next_req = NULL;
- if (handle->non_overlapped_writes_tail) {
- req->next_req =
- handle->non_overlapped_writes_tail->next_req;
- handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
- handle->non_overlapped_writes_tail = req;
- } else {
- req->next_req = (uv_req_t*)req;
- handle->non_overlapped_writes_tail = req;
- }
-}
-
-
-static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
- uv_write_t* req;
-
- if (handle->non_overlapped_writes_tail) {
- req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
-
- if (req == handle->non_overlapped_writes_tail) {
- handle->non_overlapped_writes_tail = NULL;
- } else {
- handle->non_overlapped_writes_tail->next_req =
- req->next_req;
- }
-
- return req;
- } else {
- /* queue empty */
- return NULL;
- }
-}
-
-
-static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
- uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
- if (req) {
- if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
- req,
- WT_EXECUTELONGFUNCTION)) {
- uv_fatal_error(GetLastError(), "QueueUserWorkItem");
- }
- }
-}
-
-
-static int uv_pipe_write_impl(uv_loop_t* loop,
- uv_write_t* req,
- uv_pipe_t* handle,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
- int err;
- int result;
- uv_tcp_t* tcp_send_handle;
- uv_write_t* ipc_header_req;
- uv_ipc_frame_uv_stream ipc_frame;
-
- if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
- return ERROR_NOT_SUPPORTED;
- }
-
- /* Only TCP handles are supported for sharing. */
- if (send_handle && ((send_handle->type != UV_TCP) ||
- (!(send_handle->flags & UV_HANDLE_BOUND) &&
- !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
- return ERROR_NOT_SUPPORTED;
- }
-
- assert(handle->handle != INVALID_HANDLE_VALUE);
-
- uv_req_init(loop, (uv_req_t*) req);
- req->type = UV_WRITE;
- req->handle = (uv_stream_t*) handle;
- req->cb = cb;
- req->ipc_header = 0;
- req->event_handle = NULL;
- req->wait_handle = INVALID_HANDLE_VALUE;
- memset(&req->overlapped, 0, sizeof(req->overlapped));
-
- if (handle->ipc) {
- assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
- ipc_frame.header.flags = 0;
-
- /* Use the IPC framing protocol. */
- if (send_handle) {
- tcp_send_handle = (uv_tcp_t*)send_handle;
-
- err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
- &ipc_frame.socket_info);
- if (err) {
- return err;
- }
- ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
-
- if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
- ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
- }
- }
-
- if (nbufs == 1) {
- ipc_frame.header.flags |= UV_IPC_RAW_DATA;
- ipc_frame.header.raw_data_length = bufs[0].len;
- }
-
- /*
- * Use the provided req if we're only doing a single write.
- * If we're doing multiple writes, use ipc_header_write_req to do
- * the first write, and then use the provided req for the second write.
- */
- if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
- ipc_header_req = req;
- } else {
- /*
- * Try to use the preallocated write req if it's available.
- * Otherwise allocate a new one.
- */
- if (handle->ipc_header_write_req.type != UV_WRITE) {
- ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
- } else {
- ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
- if (!ipc_header_req) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
- }
-
- uv_req_init(loop, (uv_req_t*) ipc_header_req);
- ipc_header_req->type = UV_WRITE;
- ipc_header_req->handle = (uv_stream_t*) handle;
- ipc_header_req->cb = NULL;
- ipc_header_req->ipc_header = 1;
- }
-
- /* Write the header or the whole frame. */
- memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
-
- /* Using overlapped IO, but wait for completion before returning.
- This write is blocking because ipc_frame is on stack. */
- ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
- if (!ipc_header_req->overlapped.hEvent) {
- uv_fatal_error(GetLastError(), "CreateEvent");
- }
-
- result = WriteFile(handle->handle,
- &ipc_frame,
- ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
- sizeof(ipc_frame) : sizeof(ipc_frame.header),
- NULL,
- &ipc_header_req->overlapped);
- if (!result && GetLastError() != ERROR_IO_PENDING) {
- err = GetLastError();
- CloseHandle(ipc_header_req->overlapped.hEvent);
- return err;
- }
-
- if (!result) {
- /* Request not completed immediately. Wait for it.*/
- if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
- WAIT_OBJECT_0) {
- err = GetLastError();
- CloseHandle(ipc_header_req->overlapped.hEvent);
- return err;
- }
- }
- ipc_header_req->queued_bytes = 0;
- CloseHandle(ipc_header_req->overlapped.hEvent);
- ipc_header_req->overlapped.hEvent = NULL;
-
- REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
- handle->reqs_pending++;
- handle->write_reqs_pending++;
-
- /* If we don't have any raw data to write - we're done. */
- if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
- return 0;
- }
- }
-
- if ((handle->flags &
- (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
- (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
- DWORD bytes;
- result = WriteFile(handle->handle,
- bufs[0].base,
- bufs[0].len,
- &bytes,
- NULL);
-
- if (!result) {
- return err;
- } else {
- /* Request completed immediately. */
- req->queued_bytes = 0;
- }
-
- REGISTER_HANDLE_REQ(loop, handle, req);
- handle->reqs_pending++;
- handle->write_reqs_pending++;
- POST_COMPLETION_FOR_REQ(loop, req);
- return 0;
- } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
- req->write_buffer = bufs[0];
- uv_insert_non_overlapped_write_req(handle, req);
- if (handle->write_reqs_pending == 0) {
- uv_queue_non_overlapped_write(handle);
- }
-
- /* Request queued by the kernel. */
- req->queued_bytes = uv_count_bufs(bufs, nbufs);
- handle->write_queue_size += req->queued_bytes;
- } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
- /* Using overlapped IO, but wait for completion before returning */
- req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
- if (!req->overlapped.hEvent) {
- uv_fatal_error(GetLastError(), "CreateEvent");
- }
-
- result = WriteFile(handle->handle,
- bufs[0].base,
- bufs[0].len,
- NULL,
- &req->overlapped);
-
- if (!result && GetLastError() != ERROR_IO_PENDING) {
- err = GetLastError();
- CloseHandle(req->overlapped.hEvent);
- return err;
- }
-
- if (result) {
- /* Request completed immediately. */
- req->queued_bytes = 0;
- } else {
- /* Request queued by the kernel. */
- if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
- WAIT_OBJECT_0) {
- err = GetLastError();
- CloseHandle(ipc_header_req->overlapped.hEvent);
- return uv_translate_sys_error(err);
- }
- }
- CloseHandle(req->overlapped.hEvent);
-
- REGISTER_HANDLE_REQ(loop, handle, req);
- handle->reqs_pending++;
- handle->write_reqs_pending++;
- POST_COMPLETION_FOR_REQ(loop, req);
- return 0;
- } else {
- result = WriteFile(handle->handle,
- bufs[0].base,
- bufs[0].len,
- NULL,
- &req->overlapped);
-
- if (!result && GetLastError() != ERROR_IO_PENDING) {
- return GetLastError();
- }
-
- if (result) {
- /* Request completed immediately. */
- req->queued_bytes = 0;
- } else {
- /* Request queued by the kernel. */
- req->queued_bytes = uv_count_bufs(bufs, nbufs);
- handle->write_queue_size += req->queued_bytes;
- }
-
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- req->event_handle = CreateEvent(NULL, 0, 0, NULL);
- if (!req->event_handle) {
- uv_fatal_error(GetLastError(), "CreateEvent");
- }
- if (!RegisterWaitForSingleObject(&req->wait_handle,
- req->overlapped.hEvent, post_completion_write_wait, (void*) req,
- INFINITE, WT_EXECUTEINWAITTHREAD)) {
- return GetLastError();
- }
- }
- }
-
- REGISTER_HANDLE_REQ(loop, handle, req);
- handle->reqs_pending++;
- handle->write_reqs_pending++;
-
- return 0;
-}
-
-
-int uv_pipe_write(uv_loop_t* loop,
- uv_write_t* req,
- uv_pipe_t* handle,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_write_cb cb) {
- return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
-}
-
-
-int uv_pipe_write2(uv_loop_t* loop,
- uv_write_t* req,
- uv_pipe_t* handle,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
- if (!handle->ipc) {
- return WSAEINVAL;
- }
-
- return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
-}
-
-
-static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
- uv_buf_t buf) {
- /* If there is an eof timer running, we don't need it any more, */
- /* so discard it. */
- eof_timer_destroy(handle);
-
- handle->flags &= ~UV_HANDLE_READABLE;
- uv_read_stop((uv_stream_t*) handle);
-
- if (handle->read2_cb) {
- handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE);
- } else {
- handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
- }
-}
-
-
-static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
- uv_buf_t buf) {
- /* If there is an eof timer running, we don't need it any more, */
- /* so discard it. */
- eof_timer_destroy(handle);
-
- uv_read_stop((uv_stream_t*) handle);
-
- if (handle->read2_cb) {
- handle->read2_cb(handle,
- uv_translate_sys_error(error),
- &buf,
- UV_UNKNOWN_HANDLE);
- } else {
- handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
- }
-}
-
-
-static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
- int error, uv_buf_t buf) {
- if (error == ERROR_BROKEN_PIPE) {
- uv_pipe_read_eof(loop, handle, buf);
- } else {
- uv_pipe_read_error(loop, handle, error, buf);
- }
-}
-
-
-void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
- uv_req_t* req) {
- DWORD bytes, avail;
- uv_buf_t buf;
- uv_ipc_frame_uv_stream ipc_frame;
-
- assert(handle->type == UV_NAMED_PIPE);
-
- handle->flags &= ~UV_HANDLE_READ_PENDING;
- eof_timer_stop(handle);
-
- if (!REQ_SUCCESS(req)) {
- /* An error occurred doing the 0-read. */
- if (handle->flags & UV_HANDLE_READING) {
- uv_pipe_read_error_or_eof(loop,
- handle,
- GET_REQ_ERROR(req),
- uv_null_buf_);
- }
- } else {
- /* Do non-blocking reads until the buffer is empty */
- while (handle->flags & UV_HANDLE_READING) {
- if (!PeekNamedPipe(handle->handle,
- NULL,
- 0,
- NULL,
- &avail,
- NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
- break;
- }
-
- if (avail == 0) {
- /* There is nothing to read after all. */
- break;
- }
-
- if (handle->ipc) {
- /* Use the IPC framing protocol to read the incoming data. */
- if (handle->remaining_ipc_rawdata_bytes == 0) {
- /* We're reading a new frame. First, read the header. */
- assert(avail >= sizeof(ipc_frame.header));
-
- if (!ReadFile(handle->handle,
- &ipc_frame.header,
- sizeof(ipc_frame.header),
- &bytes,
- NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
- uv_null_buf_);
- break;
- }
-
- assert(bytes == sizeof(ipc_frame.header));
- assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
- UV_IPC_TCP_CONNECTION));
-
- if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
- assert(avail - sizeof(ipc_frame.header) >=
- sizeof(ipc_frame.socket_info));
-
- /* Read the TCP socket info. */
- if (!ReadFile(handle->handle,
- &ipc_frame.socket_info,
- sizeof(ipc_frame) - sizeof(ipc_frame.header),
- &bytes,
- NULL)) {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
- uv_null_buf_);
- break;
- }
-
- assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
-
- /* Store the pending socket info. */
- assert(!handle->pending_ipc_info.socket_info);
- handle->pending_ipc_info.socket_info =
- (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
- if (!handle->pending_ipc_info.socket_info) {
- uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
- }
-
- *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
- handle->pending_ipc_info.tcp_connection =
- ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
- }
-
- if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
- handle->remaining_ipc_rawdata_bytes =
- ipc_frame.header.raw_data_length;
- continue;
- }
- } else {
- avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
- }
- }
-
- handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
- if (buf.len == 0) {
- if (handle->read2_cb) {
- handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
- } else if (handle->read_cb) {
- handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
- }
- break;
- }
- assert(buf.base != NULL);
-
- if (ReadFile(handle->handle,
- buf.base,
- buf.len,
- &bytes,
- NULL)) {
- /* Successful read */
- if (handle->ipc) {
- assert(handle->remaining_ipc_rawdata_bytes >= bytes);
- handle->remaining_ipc_rawdata_bytes =
- handle->remaining_ipc_rawdata_bytes - bytes;
- if (handle->read2_cb) {
- handle->read2_cb(handle, bytes, &buf,
- handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
- } else if (handle->read_cb) {
- handle->read_cb((uv_stream_t*)handle, bytes, &buf);
- }
-
- if (handle->pending_ipc_info.socket_info) {
- free(handle->pending_ipc_info.socket_info);
- handle->pending_ipc_info.socket_info = NULL;
- }
- } else {
- handle->read_cb((uv_stream_t*)handle, bytes, &buf);
- }
-
- /* Read again only if bytes == buf.len */
- if (bytes <= buf.len) {
- break;
- }
- } else {
- uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
- break;
- }
- }
-
- /* Post another 0-read if still reading and not closing. */
- if ((handle->flags & UV_HANDLE_READING) &&
- !(handle->flags & UV_HANDLE_READ_PENDING)) {
- uv_pipe_queue_read(loop, handle);
- }
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
-}
-
-
-void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
- uv_write_t* req) {
- int err;
-
- assert(handle->type == UV_NAMED_PIPE);
-
- assert(handle->write_queue_size >= req->queued_bytes);
- handle->write_queue_size -= req->queued_bytes;
-
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
- if (req->wait_handle != INVALID_HANDLE_VALUE) {
- UnregisterWait(req->wait_handle);
- req->wait_handle = INVALID_HANDLE_VALUE;
- }
- if (req->event_handle) {
- CloseHandle(req->event_handle);
- req->event_handle = NULL;
- }
- }
-
- if (req->ipc_header) {
- if (req == &handle->ipc_header_write_req) {
- req->type = UV_UNKNOWN_REQ;
- } else {
- free(req);
- }
- } else {
- if (req->cb) {
- err = GET_REQ_ERROR(req);
- req->cb(req, uv_translate_sys_error(err));
- }
- }
-
- handle->write_reqs_pending--;
-
- if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
- handle->non_overlapped_writes_tail) {
- assert(handle->write_reqs_pending > 0);
- uv_queue_non_overlapped_write(handle);
- }
-
- if (handle->shutdown_req != NULL &&
- handle->write_reqs_pending == 0) {
- uv_want_endgame(loop, (uv_handle_t*)handle);
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
-}
-
-
-void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
- uv_req_t* raw_req) {
- uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
-
- assert(handle->type == UV_NAMED_PIPE);
-
- if (handle->flags & UV__HANDLE_CLOSING) {
- /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
- assert(req->pipeHandle == INVALID_HANDLE_VALUE);
- DECREASE_PENDING_REQ_COUNT(handle);
- return;
- }
-
- if (REQ_SUCCESS(req)) {
- assert(req->pipeHandle != INVALID_HANDLE_VALUE);
- req->next_pending = handle->pending_accepts;
- handle->pending_accepts = req;
-
- if (handle->connection_cb) {
- handle->connection_cb((uv_stream_t*)handle, 0);
- }
- } else {
- if (req->pipeHandle != INVALID_HANDLE_VALUE) {
- CloseHandle(req->pipeHandle);
- req->pipeHandle = INVALID_HANDLE_VALUE;
- }
- if (!(handle->flags & UV__HANDLE_CLOSING)) {
- uv_pipe_queue_accept(loop, handle, req, FALSE);
- }
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
-}
-
-
-void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
- uv_connect_t* req) {
- int err;
-
- assert(handle->type == UV_NAMED_PIPE);
-
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- if (req->cb) {
- err = 0;
- if (REQ_SUCCESS(req)) {
- uv_pipe_connection_init(handle);
- } else {
- err = GET_REQ_ERROR(req);
- }
- req->cb(req, uv_translate_sys_error(err));
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
-}
-
-
-void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
- uv_shutdown_t* req) {
- assert(handle->type == UV_NAMED_PIPE);
-
- UNREGISTER_HANDLE_REQ(loop, handle, req);
-
- if (handle->flags & UV_HANDLE_READABLE) {
- /* Initialize and optionally start the eof timer. Only do this if the */
- /* pipe is readable and we haven't seen EOF come in ourselves. */
- eof_timer_init(handle);
-
- /* If reading start the timer right now. */
- /* Otherwise uv_pipe_queue_read will start it. */
- if (handle->flags & UV_HANDLE_READ_PENDING) {
- eof_timer_start(handle);
- }
-
- } else {
- /* This pipe is not readable. We can just close it to let the other end */
- /* know that we're done writing. */
- CloseHandle(handle->handle);
- handle->handle = INVALID_HANDLE_VALUE;
- }
-
- if (req->cb) {
- req->cb(req, 0);
- }
-
- DECREASE_PENDING_REQ_COUNT(handle);
-}
-
-
-static void eof_timer_init(uv_pipe_t* pipe) {
- int r;
-
- assert(pipe->eof_timer == NULL);
- assert(pipe->flags & UV_HANDLE_CONNECTION);
-
- pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
-
- r = uv_timer_init(pipe->loop, pipe->eof_timer);
- assert(r == 0); /* timers can't fail */
- pipe->eof_timer->data = pipe;
- uv_unref((uv_handle_t*) pipe->eof_timer);
-}
-
-
-static void eof_timer_start(uv_pipe_t* pipe) {
- assert(pipe->flags & UV_HANDLE_CONNECTION);
-
- if (pipe->eof_timer != NULL) {
- uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
- }
-}
-
-
-static void eof_timer_stop(uv_pipe_t* pipe) {
- assert(pipe->flags & UV_HANDLE_CONNECTION);
-
- if (pipe->eof_timer != NULL) {
- uv_timer_stop(pipe->eof_timer);
- }
-}
-
-
-static void eof_timer_cb(uv_timer_t* timer, int status) {
- uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
- uv_loop_t* loop = timer->loop;
-
- assert(status == 0); /* timers can't fail */
- assert(pipe->type == UV_NAMED_PIPE);
-
- /* This should always be true, since we start the timer only */
- /* in uv_pipe_queue_read after successfully calling ReadFile, */
- /* or in uv_process_pipe_shutdown_req if a read is pending, */
- /* and we always immediately stop the timer in */
- /* uv_process_pipe_read_req. */
- assert(pipe->flags & UV_HANDLE_READ_PENDING);
-
- /* If there are many packets coming off the iocp then the timer callback */
- /* may be called before the read request is coming off the queue. */
- /* Therefore we check here if the read request has completed but will */
- /* be processed later. */
- if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
- HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
- return;
- }
-
- /* Force both ends off the pipe. */
- CloseHandle(pipe->handle);
- pipe->handle = INVALID_HANDLE_VALUE;
-
- /* Stop reading, so the pending read that is going to fail will */
- /* not be reported to the user. */
- uv_read_stop((uv_stream_t*) pipe);
-
- /* Report the eof and update flags. This will get reported even if the */
- /* user stopped reading in the meantime. TODO: is that okay? */
- uv_pipe_read_eof(loop, pipe, uv_null_buf_);
-}
-
-
-static void eof_timer_destroy(uv_pipe_t* pipe) {
- assert(pipe->flags && UV_HANDLE_CONNECTION);
-
- if (pipe->eof_timer) {
- uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
- pipe->eof_timer = NULL;
- }
-}
-
-
-static void eof_timer_close_cb(uv_handle_t* handle) {
- assert(handle->type == UV_TIMER);
- free(handle);
-}
-
-
-int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
- HANDLE os_handle = (HANDLE)_get_osfhandle(file);
-
- if (os_handle == INVALID_HANDLE_VALUE ||
- uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
- return UV_EINVAL;
- }
-
- uv_pipe_connection_init(pipe);
- pipe->handle = os_handle;
- pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
-
- if (pipe->ipc) {
- assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
- pipe->ipc_pid = uv_parent_pid();
- assert(pipe->ipc_pid != -1);
- }
- return 0;
-}
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include <assert.h>
+#include <io.h>
+#include <string.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include "uv.h"
+#include "internal.h"
+#include "handle-inl.h"
+#include "stream-inl.h"
+#include "req-inl.h"
+
+
+/* A zero-size buffer for use by uv_pipe_read */
+static char uv_zero_[] = "";
+
+/* Null uv_buf_t */
+static const uv_buf_t uv_null_buf_ = { 0, NULL };
+
+/* The timeout that the pipe will wait for the remote end to write data */
+/* when the local ends wants to shut it down. */
+static const int64_t eof_timeout = 50; /* ms */
+
+static const int default_pending_pipe_instances = 4;
+
+/* IPC protocol flags. */
+#define UV_IPC_RAW_DATA 0x0001
+#define UV_IPC_TCP_SERVER 0x0002
+#define UV_IPC_TCP_CONNECTION 0x0004
+
+/* IPC frame header. */
+typedef struct {
+ int flags;
+ uint64_t raw_data_length;
+} uv_ipc_frame_header_t;
+
+/* IPC frame, which contains an imported TCP socket stream. */
+typedef struct {
+ uv_ipc_frame_header_t header;
+ WSAPROTOCOL_INFOW socket_info;
+} uv_ipc_frame_uv_stream;
+
+static void eof_timer_init(uv_pipe_t* pipe);
+static void eof_timer_start(uv_pipe_t* pipe);
+static void eof_timer_stop(uv_pipe_t* pipe);
+static void eof_timer_cb(uv_timer_t* timer, int status);
+static void eof_timer_destroy(uv_pipe_t* pipe);
+static void eof_timer_close_cb(uv_handle_t* handle);
+
+
+static void uv_unique_pipe_name(char* ptr, char* name, size_t size) {
+ _snprintf(name, size, "\\\\.\\pipe\\uv\\%p-%u", ptr, GetCurrentProcessId());
+}
+
+
+int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
+ uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
+
+ handle->reqs_pending = 0;
+ handle->handle = INVALID_HANDLE_VALUE;
+ handle->name = NULL;
+ handle->ipc_pid = 0;
+ handle->remaining_ipc_rawdata_bytes = 0;
+ handle->pending_ipc_info.socket_info = NULL;
+ handle->pending_ipc_info.tcp_connection = 0;
+ handle->ipc = ipc;
+ handle->non_overlapped_writes_tail = NULL;
+
+ uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
+
+ return 0;
+}
+
+
+static void uv_pipe_connection_init(uv_pipe_t* handle) {
+ uv_connection_init((uv_stream_t*) handle);
+ handle->read_req.data = handle;
+ handle->eof_timer = NULL;
+}
+
+
+static HANDLE open_named_pipe(WCHAR* name, DWORD* duplex_flags) {
+ HANDLE pipeHandle;
+
+ /*
+ * Assume that we have a duplex pipe first, so attempt to
+ * connect with GENERIC_READ | GENERIC_WRITE.
+ */
+ pipeHandle = CreateFileW(name,
+ GENERIC_READ | GENERIC_WRITE,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ *duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
+ return pipeHandle;
+ }
+
+ /*
+ * If the pipe is not duplex CreateFileW fails with
+ * ERROR_ACCESS_DENIED. In that case try to connect
+ * as a read-only or write-only.
+ */
+ if (GetLastError() == ERROR_ACCESS_DENIED) {
+ pipeHandle = CreateFileW(name,
+ GENERIC_READ | FILE_WRITE_ATTRIBUTES,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ *duplex_flags = UV_HANDLE_READABLE;
+ return pipeHandle;
+ }
+ }
+
+ if (GetLastError() == ERROR_ACCESS_DENIED) {
+ pipeHandle = CreateFileW(name,
+ GENERIC_WRITE | FILE_READ_ATTRIBUTES,
+ 0,
+ NULL,
+ OPEN_EXISTING,
+ FILE_FLAG_OVERLAPPED,
+ NULL);
+
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ *duplex_flags = UV_HANDLE_WRITABLE;
+ return pipeHandle;
+ }
+ }
+
+ return INVALID_HANDLE_VALUE;
+}
+
+
+int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
+ char* name, size_t nameSize) {
+ HANDLE pipeHandle;
+ int err;
+ char* ptr = (char*)handle;
+
+ for (;;) {
+ uv_unique_pipe_name(ptr, name, nameSize);
+
+ pipeHandle = CreateNamedPipeA(name,
+ access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
+ NULL);
+
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ /* No name collisions. We're done. */
+ break;
+ }
+
+ err = GetLastError();
+ if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
+ goto error;
+ }
+
+ /* Pipe name collision. Increment the pointer and try again. */
+ ptr++;
+ }
+
+ if (CreateIoCompletionPort(pipeHandle,
+ loop->iocp,
+ (ULONG_PTR)handle,
+ 0) == NULL) {
+ err = GetLastError();
+ goto error;
+ }
+
+ uv_pipe_connection_init(handle);
+ handle->handle = pipeHandle;
+
+ return 0;
+
+ error:
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(pipeHandle);
+ }
+
+ return err;
+}
+
+
+static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
+ HANDLE pipeHandle, DWORD duplex_flags) {
+ NTSTATUS nt_status;
+ IO_STATUS_BLOCK io_status;
+ FILE_MODE_INFORMATION mode_info;
+ DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
+
+ if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
+ /* If this returns ERROR_INVALID_PARAMETER we probably opened something */
+ /* that is not a pipe. */
+ if (GetLastError() == ERROR_INVALID_PARAMETER) {
+ SetLastError(WSAENOTSOCK);
+ }
+ return -1;
+ }
+
+ /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
+ nt_status = pNtQueryInformationFile(pipeHandle,
+ &io_status,
+ &mode_info,
+ sizeof(mode_info),
+ FileModeInformation);
+ if (nt_status != STATUS_SUCCESS) {
+ return -1;
+ }
+
+ if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
+ mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
+ /* Non-overlapped pipe. */
+ handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
+ } else {
+ /* Overlapped pipe. Try to associate with IOCP. */
+ if (CreateIoCompletionPort(pipeHandle,
+ loop->iocp,
+ (ULONG_PTR)handle,
+ 0) == NULL) {
+ handle->flags |= UV_HANDLE_EMULATE_IOCP;
+ }
+ }
+
+ handle->handle = pipeHandle;
+ handle->flags |= duplex_flags;
+
+ return 0;
+}
+
+
+static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
+ uv_loop_t* loop;
+ uv_pipe_t* handle;
+ uv_shutdown_t* req;
+
+ req = (uv_shutdown_t*) parameter;
+ assert(req);
+ handle = (uv_pipe_t*) req->handle;
+ assert(handle);
+ loop = handle->loop;
+ assert(loop);
+
+ FlushFileBuffers(handle->handle);
+
+ /* Post completed */
+ POST_COMPLETION_FOR_REQ(loop, req);
+
+ return 0;
+}
+
+
+void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
+ int err;
+ DWORD result;
+ uv_shutdown_t* req;
+ NTSTATUS nt_status;
+ IO_STATUS_BLOCK io_status;
+ FILE_PIPE_LOCAL_INFORMATION pipe_info;
+
+ if ((handle->flags & UV_HANDLE_CONNECTION) &&
+ handle->shutdown_req != NULL &&
+ handle->write_reqs_pending == 0) {
+ req = handle->shutdown_req;
+
+ /* Clear the shutdown_req field so we don't go here again. */
+ handle->shutdown_req = NULL;
+
+ if (handle->flags & UV__HANDLE_CLOSING) {
+ UNREGISTER_HANDLE_REQ(loop, handle, req);
+
+ /* Already closing. Cancel the shutdown. */
+ if (req->cb) {
+ req->cb(req, UV_ECANCELED);
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
+ }
+
+ /* Try to avoid flushing the pipe buffer in the thread pool. */
+ nt_status = pNtQueryInformationFile(handle->handle,
+ &io_status,
+ &pipe_info,
+ sizeof pipe_info,
+ FilePipeLocalInformation);
+
+ if (nt_status != STATUS_SUCCESS) {
+ /* Failure */
+ UNREGISTER_HANDLE_REQ(loop, handle, req);
+
+ handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
+ if (req->cb) {
+ err = pRtlNtStatusToDosError(nt_status);
+ req->cb(req, uv_translate_sys_error(err));
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
+ }
+
+ if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
+ /* Short-circuit, no need to call FlushFileBuffers. */
+ uv_insert_pending_req(loop, (uv_req_t*) req);
+ return;
+ }
+
+ /* Run FlushFileBuffers in the thread pool. */
+ result = QueueUserWorkItem(pipe_shutdown_thread_proc,
+ req,
+ WT_EXECUTELONGFUNCTION);
+ if (result) {
+ return;
+
+ } else {
+ /* Failure. */
+ UNREGISTER_HANDLE_REQ(loop, handle, req);
+
+ handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
+ if (req->cb) {
+ err = GetLastError();
+ req->cb(req, uv_translate_sys_error(err));
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
+ }
+ }
+
+ if (handle->flags & UV__HANDLE_CLOSING &&
+ handle->reqs_pending == 0) {
+ assert(!(handle->flags & UV_HANDLE_CLOSED));
+
+ if (handle->flags & UV_HANDLE_CONNECTION) {
+ if (handle->pending_ipc_info.socket_info) {
+ free(handle->pending_ipc_info.socket_info);
+ handle->pending_ipc_info.socket_info = NULL;
+ }
+
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(handle->read_req.wait_handle);
+ handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
+ }
+ if (handle->read_req.event_handle) {
+ CloseHandle(handle->read_req.event_handle);
+ handle->read_req.event_handle = NULL;
+ }
+ }
+ }
+
+ if (handle->flags & UV_HANDLE_PIPESERVER) {
+ assert(handle->accept_reqs);
+ free(handle->accept_reqs);
+ handle->accept_reqs = NULL;
+ }
+
+ uv__handle_close(handle);
+ }
+}
+
+
+void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
+ handle->pending_instances = count;
+ handle->flags |= UV_HANDLE_PIPESERVER;
+}
+
+
+/* Creates a pipe server. */
+int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
+ uv_loop_t* loop = handle->loop;
+ int i, err, nameSize;
+ uv_pipe_accept_t* req;
+
+ if (handle->flags & UV_HANDLE_BOUND) {
+ return UV_EINVAL;
+ }
+
+ if (!name) {
+ return UV_EINVAL;
+ }
+
+ if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
+ handle->pending_instances = default_pending_pipe_instances;
+ }
+
+ handle->accept_reqs = (uv_pipe_accept_t*)
+ malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances);
+ if (!handle->accept_reqs) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ for (i = 0; i < handle->pending_instances; i++) {
+ req = &handle->accept_reqs[i];
+ uv_req_init(loop, (uv_req_t*) req);
+ req->type = UV_ACCEPT;
+ req->data = handle;
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ req->next_pending = NULL;
+ }
+
+ /* Convert name to UTF16. */
+ nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
+ handle->name = (WCHAR*)malloc(nameSize);
+ if (!handle->name) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
+ return uv_translate_sys_error(GetLastError());
+ }
+
+ /*
+ * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
+ * If this fails then there's already a pipe server for the given pipe name.
+ */
+ handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
+ FILE_FLAG_FIRST_PIPE_INSTANCE,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
+
+ if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
+ err = GetLastError();
+ if (err == ERROR_ACCESS_DENIED) {
+ err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
+ } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
+ err = WSAEACCES; /* Translates to UV_EACCES. */
+ }
+ goto error;
+ }
+
+ if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
+ err = GetLastError();
+ goto error;
+ }
+
+ handle->pending_accepts = NULL;
+ handle->flags |= UV_HANDLE_PIPESERVER;
+ handle->flags |= UV_HANDLE_BOUND;
+
+ return 0;
+
+error:
+ if (handle->name) {
+ free(handle->name);
+ handle->name = NULL;
+ }
+
+ if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(handle->accept_reqs[0].pipeHandle);
+ handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
+ }
+
+ return uv_translate_sys_error(err);
+}
+
+
+static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
+ uv_loop_t* loop;
+ uv_pipe_t* handle;
+ uv_connect_t* req;
+ HANDLE pipeHandle = INVALID_HANDLE_VALUE;
+ DWORD duplex_flags;
+
+ req = (uv_connect_t*) parameter;
+ assert(req);
+ handle = (uv_pipe_t*) req->handle;
+ assert(handle);
+ loop = handle->loop;
+ assert(loop);
+
+ /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. */
+ /* We wait for the pipe to become available with WaitNamedPipe. */
+ while (WaitNamedPipeW(handle->name, 30000)) {
+ /* The pipe is now available, try to connect. */
+ pipeHandle = open_named_pipe(handle->name, &duplex_flags);
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ break;
+ }
+
+ SwitchToThread();
+ }
+
+ if (pipeHandle != INVALID_HANDLE_VALUE &&
+ !uv_set_pipe_handle(loop, handle, pipeHandle, duplex_flags)) {
+ SET_REQ_SUCCESS(req);
+ } else {
+ SET_REQ_ERROR(req, GetLastError());
+ }
+
+ /* Post completed */
+ POST_COMPLETION_FOR_REQ(loop, req);
+
+ return 0;
+}
+
+
+void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
+ const char* name, uv_connect_cb cb) {
+ uv_loop_t* loop = handle->loop;
+ int err, nameSize;
+ HANDLE pipeHandle = INVALID_HANDLE_VALUE;
+ DWORD duplex_flags;
+
+ uv_req_init(loop, (uv_req_t*) req);
+ req->type = UV_CONNECT;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+
+ /* Convert name to UTF16. */
+ nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR);
+ handle->name = (WCHAR*)malloc(nameSize);
+ if (!handle->name) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
+ err = GetLastError();
+ goto error;
+ }
+
+ pipeHandle = open_named_pipe(handle->name, &duplex_flags);
+ if (pipeHandle == INVALID_HANDLE_VALUE) {
+ if (GetLastError() == ERROR_PIPE_BUSY) {
+ /* Wait for the server to make a pipe instance available. */
+ if (!QueueUserWorkItem(&pipe_connect_thread_proc,
+ req,
+ WT_EXECUTELONGFUNCTION)) {
+ err = GetLastError();
+ goto error;
+ }
+
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ handle->reqs_pending++;
+
+ return;
+ }
+
+ err = GetLastError();
+ goto error;
+ }
+
+ assert(pipeHandle != INVALID_HANDLE_VALUE);
+
+ if (uv_set_pipe_handle(loop,
+ (uv_pipe_t*) req->handle,
+ pipeHandle,
+ duplex_flags)) {
+ err = GetLastError();
+ goto error;
+ }
+
+ SET_REQ_SUCCESS(req);
+ uv_insert_pending_req(loop, (uv_req_t*) req);
+ handle->reqs_pending++;
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ return;
+
+error:
+ if (handle->name) {
+ free(handle->name);
+ handle->name = NULL;
+ }
+
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(pipeHandle);
+ }
+
+ /* Make this req pending reporting an error. */
+ SET_REQ_ERROR(req, err);
+ uv_insert_pending_req(loop, (uv_req_t*) req);
+ handle->reqs_pending++;
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ return;
+}
+
+
+/* Cleans up uv_pipe_t (server or connection) and all resources associated */
+/* with it. */
+void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) {
+ int i;
+ HANDLE pipeHandle;
+
+ if (handle->name) {
+ free(handle->name);
+ handle->name = NULL;
+ }
+
+ if (handle->flags & UV_HANDLE_PIPESERVER) {
+ for (i = 0; i < handle->pending_instances; i++) {
+ pipeHandle = handle->accept_reqs[i].pipeHandle;
+ if (pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(pipeHandle);
+ handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
+ }
+ }
+ }
+
+ if (handle->flags & UV_HANDLE_CONNECTION) {
+ handle->flags &= ~UV_HANDLE_WRITABLE;
+ eof_timer_destroy(handle);
+ }
+
+ if ((handle->flags & UV_HANDLE_CONNECTION)
+ && handle->handle != INVALID_HANDLE_VALUE) {
+ CloseHandle(handle->handle);
+ handle->handle = INVALID_HANDLE_VALUE;
+ }
+}
+
+
+void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
+ if (handle->flags & UV_HANDLE_READING) {
+ handle->flags &= ~UV_HANDLE_READING;
+ DECREASE_ACTIVE_COUNT(loop, handle);
+ }
+
+ if (handle->flags & UV_HANDLE_LISTENING) {
+ handle->flags &= ~UV_HANDLE_LISTENING;
+ DECREASE_ACTIVE_COUNT(loop, handle);
+ }
+
+ uv_pipe_cleanup(loop, handle);
+
+ if (handle->reqs_pending == 0) {
+ uv_want_endgame(loop, (uv_handle_t*) handle);
+ }
+
+ handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
+ uv__handle_closing(handle);
+}
+
+
+static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_pipe_accept_t* req, BOOL firstInstance) {
+ assert(handle->flags & UV_HANDLE_LISTENING);
+
+ if (!firstInstance) {
+ assert(req->pipeHandle == INVALID_HANDLE_VALUE);
+
+ req->pipeHandle = CreateNamedPipeW(handle->name,
+ PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
+ PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
+ PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
+
+ if (req->pipeHandle == INVALID_HANDLE_VALUE) {
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req(loop, (uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
+
+ if (uv_set_pipe_handle(loop, handle, req->pipeHandle, 0)) {
+ CloseHandle(req->pipeHandle);
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ SET_REQ_ERROR(req, GetLastError());
+ uv_insert_pending_req(loop, (uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
+ }
+
+ assert(req->pipeHandle != INVALID_HANDLE_VALUE);
+
+ /* Prepare the overlapped structure. */
+ memset(&(req->overlapped), 0, sizeof(req->overlapped));
+
+ if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) &&
+ GetLastError() != ERROR_IO_PENDING) {
+ if (GetLastError() == ERROR_PIPE_CONNECTED) {
+ SET_REQ_SUCCESS(req);
+ } else {
+ CloseHandle(req->pipeHandle);
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ /* Make this req pending reporting an error. */
+ SET_REQ_ERROR(req, GetLastError());
+ }
+ uv_insert_pending_req(loop, (uv_req_t*) req);
+ handle->reqs_pending++;
+ return;
+ }
+
+ handle->reqs_pending++;
+}
+
+
+int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
+ uv_loop_t* loop = server->loop;
+ uv_pipe_t* pipe_client;
+ uv_pipe_accept_t* req;
+
+ if (server->ipc) {
+ if (!server->pending_ipc_info.socket_info) {
+ /* No valid pending sockets. */
+ return WSAEWOULDBLOCK;
+ }
+
+ return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
+ server->pending_ipc_info.tcp_connection);
+ } else {
+ pipe_client = (uv_pipe_t*)client;
+
+ /* Find a connection instance that has been connected, but not yet */
+ /* accepted. */
+ req = server->pending_accepts;
+
+ if (!req) {
+ /* No valid connections found, so we error out. */
+ return WSAEWOULDBLOCK;
+ }
+
+ /* Initialize the client handle and copy the pipeHandle to the client */
+ uv_pipe_connection_init(pipe_client);
+ pipe_client->handle = req->pipeHandle;
+ pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
+
+ /* Prepare the req to pick up a new connection */
+ server->pending_accepts = req->next_pending;
+ req->next_pending = NULL;
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+
+ if (!(server->flags & UV__HANDLE_CLOSING)) {
+ uv_pipe_queue_accept(loop, server, req, FALSE);
+ }
+ }
+
+ return 0;
+}
+
+
+/* Starts listening for connections for the given pipe. */
+int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
+ uv_loop_t* loop = handle->loop;
+ int i;
+
+ if (handle->flags & UV_HANDLE_LISTENING) {
+ handle->connection_cb = cb;
+ }
+
+ if (!(handle->flags & UV_HANDLE_BOUND)) {
+ return WSAEINVAL;
+ }
+
+ if (handle->flags & UV_HANDLE_READING) {
+ return WSAEISCONN;
+ }
+
+ if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
+ return ERROR_NOT_SUPPORTED;
+ }
+
+ handle->flags |= UV_HANDLE_LISTENING;
+ INCREASE_ACTIVE_COUNT(loop, handle);
+ handle->connection_cb = cb;
+
+ /* First pipe handle should have already been created in uv_pipe_bind */
+ assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
+
+ for (i = 0; i < handle->pending_instances; i++) {
+ uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
+ }
+
+ return 0;
+}
+
+
+static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
+ int result;
+ DWORD bytes;
+ uv_read_t* req = (uv_read_t*) parameter;
+ uv_pipe_t* handle = (uv_pipe_t*) req->data;
+ uv_loop_t* loop = handle->loop;
+
+ assert(req != NULL);
+ assert(req->type == UV_READ);
+ assert(handle->type == UV_NAMED_PIPE);
+
+ result = ReadFile(handle->handle,
+ &uv_zero_,
+ 0,
+ &bytes,
+ NULL);
+
+ if (!result) {
+ SET_REQ_ERROR(req, GetLastError());
+ }
+
+ POST_COMPLETION_FOR_REQ(loop, req);
+ return 0;
+}
+
+
+static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
+ int result;
+ DWORD bytes;
+ uv_write_t* req = (uv_write_t*) parameter;
+ uv_pipe_t* handle = (uv_pipe_t*) req->handle;
+ uv_loop_t* loop = handle->loop;
+
+ assert(req != NULL);
+ assert(req->type == UV_WRITE);
+ assert(handle->type == UV_NAMED_PIPE);
+ assert(req->write_buffer.base);
+
+ result = WriteFile(handle->handle,
+ req->write_buffer.base,
+ req->write_buffer.len,
+ &bytes,
+ NULL);
+
+ if (!result) {
+ SET_REQ_ERROR(req, GetLastError());
+ }
+
+ POST_COMPLETION_FOR_REQ(loop, req);
+ return 0;
+}
+
+
+static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
+ uv_read_t* req;
+ uv_tcp_t* handle;
+
+ req = (uv_read_t*) context;
+ assert(req != NULL);
+ handle = (uv_tcp_t*)req->data;
+ assert(handle != NULL);
+ assert(!timed_out);
+
+ if (!PostQueuedCompletionStatus(handle->loop->iocp,
+ req->overlapped.InternalHigh,
+ 0,
+ &req->overlapped)) {
+ uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
+ }
+}
+
+
+static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
+ uv_write_t* req;
+ uv_tcp_t* handle;
+
+ req = (uv_write_t*) context;
+ assert(req != NULL);
+ handle = (uv_tcp_t*)req->handle;
+ assert(handle != NULL);
+ assert(!timed_out);
+
+ if (!PostQueuedCompletionStatus(handle->loop->iocp,
+ req->overlapped.InternalHigh,
+ 0,
+ &req->overlapped)) {
+ uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
+ }
+}
+
+
+static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
+ uv_read_t* req;
+ int result;
+
+ assert(handle->flags & UV_HANDLE_READING);
+ assert(!(handle->flags & UV_HANDLE_READ_PENDING));
+
+ assert(handle->handle != INVALID_HANDLE_VALUE);
+
+ req = &handle->read_req;
+
+ if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
+ if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
+ req,
+ WT_EXECUTELONGFUNCTION)) {
+ /* Make this req pending reporting an error. */
+ SET_REQ_ERROR(req, GetLastError());
+ goto error;
+ }
+ } else {
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
+ }
+
+ /* Do 0-read */
+ result = ReadFile(handle->handle,
+ &uv_zero_,
+ 0,
+ NULL,
+ &req->overlapped);
+
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ /* Make this req pending reporting an error. */
+ SET_REQ_ERROR(req, GetLastError());
+ goto error;
+ }
+
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (!req->event_handle) {
+ req->event_handle = CreateEvent(NULL, 0, 0, NULL);
+ if (!req->event_handle) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+ }
+ if (req->wait_handle == INVALID_HANDLE_VALUE) {
+ if (!RegisterWaitForSingleObject(&req->wait_handle,
+ req->overlapped.hEvent, post_completion_read_wait, (void*) req,
+ INFINITE, WT_EXECUTEINWAITTHREAD)) {
+ SET_REQ_ERROR(req, GetLastError());
+ goto error;
+ }
+ }
+ }
+ }
+
+ /* Start the eof timer if there is one */
+ eof_timer_start(handle);
+ handle->flags |= UV_HANDLE_READ_PENDING;
+ handle->reqs_pending++;
+ return;
+
+error:
+ uv_insert_pending_req(loop, (uv_req_t*)req);
+ handle->flags |= UV_HANDLE_READ_PENDING;
+ handle->reqs_pending++;
+}
+
+
+static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb, uv_read2_cb read2_cb) {
+ uv_loop_t* loop = handle->loop;
+
+ handle->flags |= UV_HANDLE_READING;
+ INCREASE_ACTIVE_COUNT(loop, handle);
+ handle->read_cb = read_cb;
+ handle->read2_cb = read2_cb;
+ handle->alloc_cb = alloc_cb;
+
+ /* If reading was stopped and then started again, there could still be a */
+ /* read request pending. */
+ if (!(handle->flags & UV_HANDLE_READ_PENDING))
+ uv_pipe_queue_read(loop, handle);
+
+ return 0;
+}
+
+
+int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb) {
+ return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
+}
+
+
+int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
+ uv_read2_cb read_cb) {
+ return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
+}
+
+
+static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
+ uv_write_t* req) {
+ req->next_req = NULL;
+ if (handle->non_overlapped_writes_tail) {
+ req->next_req =
+ handle->non_overlapped_writes_tail->next_req;
+ handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req;
+ handle->non_overlapped_writes_tail = req;
+ } else {
+ req->next_req = (uv_req_t*)req;
+ handle->non_overlapped_writes_tail = req;
+ }
+}
+
+
+static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
+ uv_write_t* req;
+
+ if (handle->non_overlapped_writes_tail) {
+ req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req;
+
+ if (req == handle->non_overlapped_writes_tail) {
+ handle->non_overlapped_writes_tail = NULL;
+ } else {
+ handle->non_overlapped_writes_tail->next_req =
+ req->next_req;
+ }
+
+ return req;
+ } else {
+ /* queue empty */
+ return NULL;
+ }
+}
+
+
+static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
+ uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
+ if (req) {
+ if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
+ req,
+ WT_EXECUTELONGFUNCTION)) {
+ uv_fatal_error(GetLastError(), "QueueUserWorkItem");
+ }
+ }
+}
+
+
+static int uv_pipe_write_impl(uv_loop_t* loop,
+ uv_write_t* req,
+ uv_pipe_t* handle,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ int err;
+ int result;
+ uv_tcp_t* tcp_send_handle;
+ uv_write_t* ipc_header_req;
+ uv_ipc_frame_uv_stream ipc_frame;
+
+ if (nbufs != 1 && (nbufs != 0 || !send_handle)) {
+ return ERROR_NOT_SUPPORTED;
+ }
+
+ /* Only TCP handles are supported for sharing. */
+ if (send_handle && ((send_handle->type != UV_TCP) ||
+ (!(send_handle->flags & UV_HANDLE_BOUND) &&
+ !(send_handle->flags & UV_HANDLE_CONNECTION)))) {
+ return ERROR_NOT_SUPPORTED;
+ }
+
+ assert(handle->handle != INVALID_HANDLE_VALUE);
+
+ uv_req_init(loop, (uv_req_t*) req);
+ req->type = UV_WRITE;
+ req->handle = (uv_stream_t*) handle;
+ req->cb = cb;
+ req->ipc_header = 0;
+ req->event_handle = NULL;
+ req->wait_handle = INVALID_HANDLE_VALUE;
+ memset(&req->overlapped, 0, sizeof(req->overlapped));
+
+ if (handle->ipc) {
+ assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
+ ipc_frame.header.flags = 0;
+
+ /* Use the IPC framing protocol. */
+ if (send_handle) {
+ tcp_send_handle = (uv_tcp_t*)send_handle;
+
+ err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
+ &ipc_frame.socket_info);
+ if (err) {
+ return err;
+ }
+ ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
+
+ if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
+ ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
+ }
+ }
+
+ if (nbufs == 1) {
+ ipc_frame.header.flags |= UV_IPC_RAW_DATA;
+ ipc_frame.header.raw_data_length = bufs[0].len;
+ }
+
+ /*
+ * Use the provided req if we're only doing a single write.
+ * If we're doing multiple writes, use ipc_header_write_req to do
+ * the first write, and then use the provided req for the second write.
+ */
+ if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
+ ipc_header_req = req;
+ } else {
+ /*
+ * Try to use the preallocated write req if it's available.
+ * Otherwise allocate a new one.
+ */
+ if (handle->ipc_header_write_req.type != UV_WRITE) {
+ ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
+ } else {
+ ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
+ if (!ipc_header_req) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+ }
+
+ uv_req_init(loop, (uv_req_t*) ipc_header_req);
+ ipc_header_req->type = UV_WRITE;
+ ipc_header_req->handle = (uv_stream_t*) handle;
+ ipc_header_req->cb = NULL;
+ ipc_header_req->ipc_header = 1;
+ }
+
+ /* Write the header or the whole frame. */
+ memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
+
+ /* Using overlapped IO, but wait for completion before returning.
+ This write is blocking because ipc_frame is on stack. */
+ ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
+ if (!ipc_header_req->overlapped.hEvent) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+
+ result = WriteFile(handle->handle,
+ &ipc_frame,
+ ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
+ sizeof(ipc_frame) : sizeof(ipc_frame.header),
+ NULL,
+ &ipc_header_req->overlapped);
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ err = GetLastError();
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ return err;
+ }
+
+ if (!result) {
+ /* Request not completed immediately. Wait for it.*/
+ if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
+ WAIT_OBJECT_0) {
+ err = GetLastError();
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ return err;
+ }
+ }
+ ipc_header_req->queued_bytes = 0;
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ ipc_header_req->overlapped.hEvent = NULL;
+
+ REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+
+ /* If we don't have any raw data to write - we're done. */
+ if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
+ return 0;
+ }
+ }
+
+ if ((handle->flags &
+ (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
+ (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
+ DWORD bytes;
+ result = WriteFile(handle->handle,
+ bufs[0].base,
+ bufs[0].len,
+ &bytes,
+ NULL);
+
+ if (!result) {
+ return err;
+ } else {
+ /* Request completed immediately. */
+ req->queued_bytes = 0;
+ }
+
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+ POST_COMPLETION_FOR_REQ(loop, req);
+ return 0;
+ } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
+ req->write_buffer = bufs[0];
+ uv_insert_non_overlapped_write_req(handle, req);
+ if (handle->write_reqs_pending == 0) {
+ uv_queue_non_overlapped_write(handle);
+ }
+
+ /* Request queued by the kernel. */
+ req->queued_bytes = uv_count_bufs(bufs, nbufs);
+ handle->write_queue_size += req->queued_bytes;
+ } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
+ /* Using overlapped IO, but wait for completion before returning */
+ req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
+ if (!req->overlapped.hEvent) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+
+ result = WriteFile(handle->handle,
+ bufs[0].base,
+ bufs[0].len,
+ NULL,
+ &req->overlapped);
+
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ err = GetLastError();
+ CloseHandle(req->overlapped.hEvent);
+ return err;
+ }
+
+ if (result) {
+ /* Request completed immediately. */
+ req->queued_bytes = 0;
+ } else {
+ /* Request queued by the kernel. */
+ if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
+ WAIT_OBJECT_0) {
+ err = GetLastError();
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ return uv_translate_sys_error(err);
+ }
+ }
+ CloseHandle(req->overlapped.hEvent);
+
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+ POST_COMPLETION_FOR_REQ(loop, req);
+ return 0;
+ } else {
+ result = WriteFile(handle->handle,
+ bufs[0].base,
+ bufs[0].len,
+ NULL,
+ &req->overlapped);
+
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ return GetLastError();
+ }
+
+ if (result) {
+ /* Request completed immediately. */
+ req->queued_bytes = 0;
+ } else {
+ /* Request queued by the kernel. */
+ req->queued_bytes = uv_count_bufs(bufs, nbufs);
+ handle->write_queue_size += req->queued_bytes;
+ }
+
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ req->event_handle = CreateEvent(NULL, 0, 0, NULL);
+ if (!req->event_handle) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+ if (!RegisterWaitForSingleObject(&req->wait_handle,
+ req->overlapped.hEvent, post_completion_write_wait, (void*) req,
+ INFINITE, WT_EXECUTEINWAITTHREAD)) {
+ return GetLastError();
+ }
+ }
+ }
+
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+
+ return 0;
+}
+
+
+int uv_pipe_write(uv_loop_t* loop,
+ uv_write_t* req,
+ uv_pipe_t* handle,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_write_cb cb) {
+ return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
+}
+
+
+int uv_pipe_write2(uv_loop_t* loop,
+ uv_write_t* req,
+ uv_pipe_t* handle,
+ const uv_buf_t bufs[],
+ unsigned int nbufs,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ if (!handle->ipc) {
+ return WSAEINVAL;
+ }
+
+ return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
+}
+
+
+static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_buf_t buf) {
+ /* If there is an eof timer running, we don't need it any more, */
+ /* so discard it. */
+ eof_timer_destroy(handle);
+
+ handle->flags &= ~UV_HANDLE_READABLE;
+ uv_read_stop((uv_stream_t*) handle);
+
+ if (handle->read2_cb) {
+ handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE);
+ } else {
+ handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
+ }
+}
+
+
+static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
+ uv_buf_t buf) {
+ /* If there is an eof timer running, we don't need it any more, */
+ /* so discard it. */
+ eof_timer_destroy(handle);
+
+ uv_read_stop((uv_stream_t*) handle);
+
+ if (handle->read2_cb) {
+ handle->read2_cb(handle,
+ uv_translate_sys_error(error),
+ &buf,
+ UV_UNKNOWN_HANDLE);
+ } else {
+ handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
+ }
+}
+
+
+static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
+ int error, uv_buf_t buf) {
+ if (error == ERROR_BROKEN_PIPE) {
+ uv_pipe_read_eof(loop, handle, buf);
+ } else {
+ uv_pipe_read_error(loop, handle, error, buf);
+ }
+}
+
+
+void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_req_t* req) {
+ DWORD bytes, avail;
+ uv_buf_t buf;
+ uv_ipc_frame_uv_stream ipc_frame;
+
+ assert(handle->type == UV_NAMED_PIPE);
+
+ handle->flags &= ~UV_HANDLE_READ_PENDING;
+ eof_timer_stop(handle);
+
+ if (!REQ_SUCCESS(req)) {
+ /* An error occurred doing the 0-read. */
+ if (handle->flags & UV_HANDLE_READING) {
+ uv_pipe_read_error_or_eof(loop,
+ handle,
+ GET_REQ_ERROR(req),
+ uv_null_buf_);
+ }
+ } else {
+ /* Do non-blocking reads until the buffer is empty */
+ while (handle->flags & UV_HANDLE_READING) {
+ if (!PeekNamedPipe(handle->handle,
+ NULL,
+ 0,
+ NULL,
+ &avail,
+ NULL)) {
+ uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
+ break;
+ }
+
+ if (avail == 0) {
+ /* There is nothing to read after all. */
+ break;
+ }
+
+ if (handle->ipc) {
+ /* Use the IPC framing protocol to read the incoming data. */
+ if (handle->remaining_ipc_rawdata_bytes == 0) {
+ /* We're reading a new frame. First, read the header. */
+ assert(avail >= sizeof(ipc_frame.header));
+
+ if (!ReadFile(handle->handle,
+ &ipc_frame.header,
+ sizeof(ipc_frame.header),
+ &bytes,
+ NULL)) {
+ uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
+ uv_null_buf_);
+ break;
+ }
+
+ assert(bytes == sizeof(ipc_frame.header));
+ assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
+ UV_IPC_TCP_CONNECTION));
+
+ if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
+ assert(avail - sizeof(ipc_frame.header) >=
+ sizeof(ipc_frame.socket_info));
+
+ /* Read the TCP socket info. */
+ if (!ReadFile(handle->handle,
+ &ipc_frame.socket_info,
+ sizeof(ipc_frame) - sizeof(ipc_frame.header),
+ &bytes,
+ NULL)) {
+ uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
+ uv_null_buf_);
+ break;
+ }
+
+ assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
+
+ /* Store the pending socket info. */
+ assert(!handle->pending_ipc_info.socket_info);
+ handle->pending_ipc_info.socket_info =
+ (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
+ if (!handle->pending_ipc_info.socket_info) {
+ uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
+ }
+
+ *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
+ handle->pending_ipc_info.tcp_connection =
+ ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
+ }
+
+ if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
+ handle->remaining_ipc_rawdata_bytes =
+ ipc_frame.header.raw_data_length;
+ continue;
+ }
+ } else {
+ avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
+ }
+ }
+
+ handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
+ if (buf.len == 0) {
+ if (handle->read2_cb) {
+ handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
+ } else if (handle->read_cb) {
+ handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
+ }
+ break;
+ }
+ assert(buf.base != NULL);
+
+ if (ReadFile(handle->handle,
+ buf.base,
+ buf.len,
+ &bytes,
+ NULL)) {
+ /* Successful read */
+ if (handle->ipc) {
+ assert(handle->remaining_ipc_rawdata_bytes >= bytes);
+ handle->remaining_ipc_rawdata_bytes =
+ handle->remaining_ipc_rawdata_bytes - bytes;
+ if (handle->read2_cb) {
+ handle->read2_cb(handle, bytes, &buf,
+ handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
+ } else if (handle->read_cb) {
+ handle->read_cb((uv_stream_t*)handle, bytes, &buf);
+ }
+
+ if (handle->pending_ipc_info.socket_info) {
+ free(handle->pending_ipc_info.socket_info);
+ handle->pending_ipc_info.socket_info = NULL;
+ }
+ } else {
+ handle->read_cb((uv_stream_t*)handle, bytes, &buf);
+ }
+
+ /* Read again only if bytes == buf.len */
+ if (bytes <= buf.len) {
+ break;
+ }
+ } else {
+ uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
+ break;
+ }
+ }
+
+ /* Post another 0-read if still reading and not closing. */
+ if ((handle->flags & UV_HANDLE_READING) &&
+ !(handle->flags & UV_HANDLE_READ_PENDING)) {
+ uv_pipe_queue_read(loop, handle);
+ }
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+}
+
+
+void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_write_t* req) {
+ int err;
+
+ assert(handle->type == UV_NAMED_PIPE);
+
+ assert(handle->write_queue_size >= req->queued_bytes);
+ handle->write_queue_size -= req->queued_bytes;
+
+ UNREGISTER_HANDLE_REQ(loop, handle, req);
+
+ if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
+ if (req->wait_handle != INVALID_HANDLE_VALUE) {
+ UnregisterWait(req->wait_handle);
+ req->wait_handle = INVALID_HANDLE_VALUE;
+ }
+ if (req->event_handle) {
+ CloseHandle(req->event_handle);
+ req->event_handle = NULL;
+ }
+ }
+
+ if (req->ipc_header) {
+ if (req == &handle->ipc_header_write_req) {
+ req->type = UV_UNKNOWN_REQ;
+ } else {
+ free(req);
+ }
+ } else {
+ if (req->cb) {
+ err = GET_REQ_ERROR(req);
+ req->cb(req, uv_translate_sys_error(err));
+ }
+ }
+
+ handle->write_reqs_pending--;
+
+ if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
+ handle->non_overlapped_writes_tail) {
+ assert(handle->write_reqs_pending > 0);
+ uv_queue_non_overlapped_write(handle);
+ }
+
+ if (handle->shutdown_req != NULL &&
+ handle->write_reqs_pending == 0) {
+ uv_want_endgame(loop, (uv_handle_t*)handle);
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+}
+
+
+void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_req_t* raw_req) {
+ uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
+
+ assert(handle->type == UV_NAMED_PIPE);
+
+ if (handle->flags & UV__HANDLE_CLOSING) {
+ /* The req->pipeHandle should be freed already in uv_pipe_cleanup(). */
+ assert(req->pipeHandle == INVALID_HANDLE_VALUE);
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
+ }
+
+ if (REQ_SUCCESS(req)) {
+ assert(req->pipeHandle != INVALID_HANDLE_VALUE);
+ req->next_pending = handle->pending_accepts;
+ handle->pending_accepts = req;
+
+ if (handle->connection_cb) {
+ handle->connection_cb((uv_stream_t*)handle, 0);
+ }
+ } else {
+ if (req->pipeHandle != INVALID_HANDLE_VALUE) {
+ CloseHandle(req->pipeHandle);
+ req->pipeHandle = INVALID_HANDLE_VALUE;
+ }
+ if (!(handle->flags & UV__HANDLE_CLOSING)) {
+ uv_pipe_queue_accept(loop, handle, req, FALSE);
+ }
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+}
+
+
+void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_connect_t* req) {
+ int err;
+
+ assert(handle->type == UV_NAMED_PIPE);
+
+ UNREGISTER_HANDLE_REQ(loop, handle, req);
+
+ if (req->cb) {
+ err = 0;
+ if (REQ_SUCCESS(req)) {
+ uv_pipe_connection_init(handle);
+ } else {
+ err = GET_REQ_ERROR(req);
+ }
+ req->cb(req, uv_translate_sys_error(err));
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+}
+
+
+void uv_process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
+ uv_shutdown_t* req) {
+ assert(handle->type == UV_NAMED_PIPE);
+
+ UNREGISTER_HANDLE_REQ(loop, handle, req);
+
+ if (handle->flags & UV_HANDLE_READABLE) {
+ /* Initialize and optionally start the eof timer. Only do this if the */
+ /* pipe is readable and we haven't seen EOF come in ourselves. */
+ eof_timer_init(handle);
+
+ /* If reading start the timer right now. */
+ /* Otherwise uv_pipe_queue_read will start it. */
+ if (handle->flags & UV_HANDLE_READ_PENDING) {
+ eof_timer_start(handle);
+ }
+
+ } else {
+ /* This pipe is not readable. We can just close it to let the other end */
+ /* know that we're done writing. */
+ CloseHandle(handle->handle);
+ handle->handle = INVALID_HANDLE_VALUE;
+ }
+
+ if (req->cb) {
+ req->cb(req, 0);
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+}
+
+
+static void eof_timer_init(uv_pipe_t* pipe) {
+ int r;
+
+ assert(pipe->eof_timer == NULL);
+ assert(pipe->flags & UV_HANDLE_CONNECTION);
+
+ pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer);
+
+ r = uv_timer_init(pipe->loop, pipe->eof_timer);
+ assert(r == 0); /* timers can't fail */
+ pipe->eof_timer->data = pipe;
+ uv_unref((uv_handle_t*) pipe->eof_timer);
+}
+
+
+static void eof_timer_start(uv_pipe_t* pipe) {
+ assert(pipe->flags & UV_HANDLE_CONNECTION);
+
+ if (pipe->eof_timer != NULL) {
+ uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0);
+ }
+}
+
+
+static void eof_timer_stop(uv_pipe_t* pipe) {
+ assert(pipe->flags & UV_HANDLE_CONNECTION);
+
+ if (pipe->eof_timer != NULL) {
+ uv_timer_stop(pipe->eof_timer);
+ }
+}
+
+
+static void eof_timer_cb(uv_timer_t* timer, int status) {
+ uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
+ uv_loop_t* loop = timer->loop;
+
+ assert(status == 0); /* timers can't fail */
+ assert(pipe->type == UV_NAMED_PIPE);
+
+ /* This should always be true, since we start the timer only */
+ /* in uv_pipe_queue_read after successfully calling ReadFile, */
+ /* or in uv_process_pipe_shutdown_req if a read is pending, */
+ /* and we always immediately stop the timer in */
+ /* uv_process_pipe_read_req. */
+ assert(pipe->flags & UV_HANDLE_READ_PENDING);
+
+ /* If there are many packets coming off the iocp then the timer callback */
+ /* may be called before the read request is coming off the queue. */
+ /* Therefore we check here if the read request has completed but will */
+ /* be processed later. */
+ if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
+ HasOverlappedIoCompleted(&pipe->read_req.overlapped)) {
+ return;
+ }
+
+ /* Force both ends off the pipe. */
+ CloseHandle(pipe->handle);
+ pipe->handle = INVALID_HANDLE_VALUE;
+
+ /* Stop reading, so the pending read that is going to fail will */
+ /* not be reported to the user. */
+ uv_read_stop((uv_stream_t*) pipe);
+
+ /* Report the eof and update flags. This will get reported even if the */
+ /* user stopped reading in the meantime. TODO: is that okay? */
+ uv_pipe_read_eof(loop, pipe, uv_null_buf_);
+}
+
+
+static void eof_timer_destroy(uv_pipe_t* pipe) {
+ assert(pipe->flags && UV_HANDLE_CONNECTION);
+
+ if (pipe->eof_timer) {
+ uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb);
+ pipe->eof_timer = NULL;
+ }
+}
+
+
+static void eof_timer_close_cb(uv_handle_t* handle) {
+ assert(handle->type == UV_TIMER);
+ free(handle);
+}
+
+
+int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
+ HANDLE os_handle = uv__get_osfhandle(file);
+
+ if (os_handle == INVALID_HANDLE_VALUE ||
+ uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
+ return UV_EINVAL;
+ }
+
+ uv_pipe_connection_init(pipe);
+ pipe->handle = os_handle;
+ pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
+
+ if (pipe->ipc) {
+ assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
+ pipe->ipc_pid = uv_parent_pid();
+ assert(pipe->ipc_pid != -1);
+ }
+ return 0;
+}