ext/libuv/src/win/pipe.c in libuv-1.2.0 vs ext/libuv/src/win/pipe.c in libuv-1.3.0

- old
+ new

@@ -93,32 +93,32 @@ uv_stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; - handle->ipc_pid = 0; - handle->remaining_ipc_rawdata_bytes = 0; - QUEUE_INIT(&handle->pending_ipc_info.queue); - handle->pending_ipc_info.queue_len = 0; + handle->pipe.conn.ipc_pid = 0; + handle->pipe.conn.remaining_ipc_rawdata_bytes = 0; + QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue); + handle->pipe.conn.pending_ipc_info.queue_len = 0; handle->ipc = ipc; - handle->non_overlapped_writes_tail = NULL; - handle->readfile_thread = NULL; + handle->pipe.conn.non_overlapped_writes_tail = NULL; + handle->pipe.conn.readfile_thread = NULL; - uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); + uv_req_init(loop, (uv_req_t*) &handle->pipe.conn.ipc_header_write_req); return 0; } static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); handle->read_req.data = handle; - handle->eof_timer = NULL; + handle->pipe.conn.eof_timer = NULL; assert(!(handle->flags & UV_HANDLE_PIPESERVER)); if (pCancelSynchronousIo && handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { - uv_mutex_init(&handle->readfile_mutex); + uv_mutex_init(&handle->pipe.conn.readfile_mutex); handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE; } } @@ -328,20 +328,20 @@ FILE_PIPE_LOCAL_INFORMATION pipe_info; uv__ipc_queue_item_t* item; if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE; - uv_mutex_destroy(&handle->readfile_mutex); + uv_mutex_destroy(&handle->pipe.conn.readfile_mutex); } if ((handle->flags & UV_HANDLE_CONNECTION) && - handle->shutdown_req != NULL && - handle->write_reqs_pending == 0) { - req = handle->shutdown_req; + handle->stream.conn.shutdown_req != NULL && + handle->stream.conn.write_reqs_pending == 0) { + req = handle->stream.conn.shutdown_req; /* Clear the shutdown_req field so we don't go here again. */ - handle->shutdown_req = NULL; + handle->stream.conn.shutdown_req = NULL; if (handle->flags & UV__HANDLE_CLOSING) { UNREGISTER_HANDLE_REQ(loop, handle, req); /* Already closing. Cancel the shutdown. */ @@ -406,31 +406,31 @@ handle->reqs_pending == 0) { assert(!(handle->flags & UV_HANDLE_CLOSED)); if (handle->flags & UV_HANDLE_CONNECTION) { /* Free pending sockets */ - while (!QUEUE_EMPTY(&handle->pending_ipc_info.queue)) { + while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) { QUEUE* q; SOCKET socket; - q = QUEUE_HEAD(&handle->pending_ipc_info.queue); + q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue); QUEUE_REMOVE(q); item = QUEUE_DATA(q, uv__ipc_queue_item_t, member); /* Materialize socket and close it */ socket = WSASocketW(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, &item->socket_info_ex.socket_info, 0, WSA_FLAG_OVERLAPPED); - free(item); + uv__free(item); if (socket != INVALID_SOCKET) closesocket(socket); } - handle->pending_ipc_info.queue_len = 0; + handle->pipe.conn.pending_ipc_info.queue_len = 0; if (handle->flags & UV_HANDLE_EMULATE_IOCP) { if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { UnregisterWait(handle->read_req.wait_handle); handle->read_req.wait_handle = INVALID_HANDLE_VALUE; @@ -441,22 +441,22 @@ } } } if (handle->flags & UV_HANDLE_PIPESERVER) { - assert(handle->accept_reqs); - free(handle->accept_reqs); - handle->accept_reqs = NULL; + assert(handle->pipe.serv.accept_reqs); + uv__free(handle->pipe.serv.accept_reqs); + handle->pipe.serv.accept_reqs = NULL; } uv__handle_close(handle); } } void uv_pipe_pending_instances(uv_pipe_t* handle, int count) { - handle->pending_instances = count; + handle->pipe.serv.pending_instances = count; handle->flags |= UV_HANDLE_PIPESERVER; } /* Creates a pipe server. */ @@ -472,33 +472,33 @@ if (!name) { return UV_EINVAL; } if (!(handle->flags & UV_HANDLE_PIPESERVER)) { - handle->pending_instances = default_pending_pipe_instances; + handle->pipe.serv.pending_instances = default_pending_pipe_instances; } - handle->accept_reqs = (uv_pipe_accept_t*) - malloc(sizeof(uv_pipe_accept_t) * handle->pending_instances); - if (!handle->accept_reqs) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*) + uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances); + if (!handle->pipe.serv.accept_reqs) { + uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); } - for (i = 0; i < handle->pending_instances; i++) { - req = &handle->accept_reqs[i]; + for (i = 0; i < handle->pipe.serv.pending_instances; i++) { + req = &handle->pipe.serv.accept_reqs[i]; uv_req_init(loop, (uv_req_t*) req); req->type = UV_ACCEPT; req->data = handle; req->pipeHandle = INVALID_HANDLE_VALUE; req->next_pending = NULL; } /* Convert name to UTF16. */ nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR); - handle->name = (WCHAR*)malloc(nameSize); + handle->name = (WCHAR*)uv__malloc(nameSize); if (!handle->name) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); } if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) { err = GetLastError(); goto error; @@ -506,46 +506,49 @@ /* * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. * If this fails then there's already a pipe server for the given pipe name. */ - handle->accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name, + handle->pipe.serv.accept_reqs[0].pipeHandle = CreateNamedPipeW(handle->name, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL); - if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) { + if (handle->pipe.serv.accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) { err = GetLastError(); if (err == ERROR_ACCESS_DENIED) { err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */ } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) { err = WSAEACCES; /* Translates to UV_EACCES. */ } goto error; } - if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) { + if (uv_set_pipe_handle(loop, + handle, + handle->pipe.serv.accept_reqs[0].pipeHandle, + 0)) { err = GetLastError(); goto error; } - handle->pending_accepts = NULL; + handle->pipe.serv.pending_accepts = NULL; handle->flags |= UV_HANDLE_PIPESERVER; handle->flags |= UV_HANDLE_BOUND; return 0; error: if (handle->name) { - free(handle->name); + uv__free(handle->name); handle->name = NULL; } - if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) { - CloseHandle(handle->accept_reqs[0].pipeHandle); - handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE; + if (handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(handle->pipe.serv.accept_reqs[0].pipeHandle); + handle->pipe.serv.accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE; } return uv_translate_sys_error(err); } @@ -602,13 +605,13 @@ req->handle = (uv_stream_t*) handle; req->cb = cb; /* Convert name to UTF16. */ nameSize = uv_utf8_to_utf16(name, NULL, 0) * sizeof(WCHAR); - handle->name = (WCHAR*)malloc(nameSize); + handle->name = (WCHAR*)uv__malloc(nameSize); if (!handle->name) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); } if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) { err = GetLastError(); goto error; @@ -651,11 +654,11 @@ REGISTER_HANDLE_REQ(loop, handle, req); return; error: if (handle->name) { - free(handle->name); + uv__free(handle->name); handle->name = NULL; } if (pipeHandle != INVALID_HANDLE_VALUE) { CloseHandle(pipeHandle); @@ -675,27 +678,27 @@ /* Pause the ReadFile task briefly, to work around the Windows kernel bug that causes any access to a NamedPipe to deadlock if any process has called ReadFile */ HANDLE h; - uv_mutex_lock(&handle->readfile_mutex); - h = handle->readfile_thread; + uv_mutex_lock(&handle->pipe.conn.readfile_mutex); + h = handle->pipe.conn.readfile_thread; while (h) { /* spinlock: we expect this to finish quickly, or we are probably about to deadlock anyways (in the kernel), so it doesn't matter */ pCancelSynchronousIo(h); SwitchToThread(); /* yield thread control briefly */ - h = handle->readfile_thread; + h = handle->pipe.conn.readfile_thread; } } } void uv__pipe_unpause_read(uv_pipe_t* handle) { if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { - uv_mutex_unlock(&handle->readfile_mutex); + uv_mutex_unlock(&handle->pipe.conn.readfile_mutex); } } void uv__pipe_stop_read(uv_pipe_t* handle) { @@ -712,20 +715,20 @@ HANDLE pipeHandle; uv__pipe_stop_read(handle); if (handle->name) { - free(handle->name); + uv__free(handle->name); handle->name = NULL; } if (handle->flags & UV_HANDLE_PIPESERVER) { - for (i = 0; i < handle->pending_instances; i++) { - pipeHandle = handle->accept_reqs[i].pipeHandle; + for (i = 0; i < handle->pipe.serv.pending_instances; i++) { + pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle; if (pipeHandle != INVALID_HANDLE_VALUE) { CloseHandle(pipeHandle); - handle->accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE; + handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE; } } } if (handle->flags & UV_HANDLE_CONNECTION) { @@ -794,13 +797,13 @@ } assert(req->pipeHandle != INVALID_HANDLE_VALUE); /* Prepare the overlapped structure. */ - memset(&(req->overlapped), 0, sizeof(req->overlapped)); + memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); - if (!ConnectNamedPipe(req->pipeHandle, &req->overlapped) && + if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) && GetLastError() != ERROR_IO_PENDING) { if (GetLastError() == ERROR_PIPE_CONNECTED) { SET_REQ_SUCCESS(req); } else { CloseHandle(req->pipeHandle); @@ -824,34 +827,34 @@ QUEUE* q; uv__ipc_queue_item_t* item; int err; if (server->ipc) { - if (QUEUE_EMPTY(&server->pending_ipc_info.queue)) { + if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) { /* No valid pending sockets. */ return WSAEWOULDBLOCK; } - q = QUEUE_HEAD(&server->pending_ipc_info.queue); + q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue); QUEUE_REMOVE(q); - server->pending_ipc_info.queue_len--; + server->pipe.conn.pending_ipc_info.queue_len--; item = QUEUE_DATA(q, uv__ipc_queue_item_t, member); err = uv_tcp_import((uv_tcp_t*)client, &item->socket_info_ex, item->tcp_connection); if (err != 0) return err; - free(item); + uv__free(item); } else { pipe_client = (uv_pipe_t*)client; /* Find a connection instance that has been connected, but not yet */ /* accepted. */ - req = server->pending_accepts; + req = server->pipe.serv.pending_accepts; if (!req) { /* No valid connections found, so we error out. */ return WSAEWOULDBLOCK; } @@ -860,11 +863,11 @@ uv_pipe_connection_init(pipe_client); pipe_client->handle = req->pipeHandle; pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; /* Prepare the req to pick up a new connection */ - server->pending_accepts = req->next_pending; + server->pipe.serv.pending_accepts = req->next_pending; req->next_pending = NULL; req->pipeHandle = INVALID_HANDLE_VALUE; if (!(server->flags & UV__HANDLE_CLOSING)) { uv_pipe_queue_accept(loop, server, req, FALSE); @@ -879,11 +882,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv_loop_t* loop = handle->loop; int i; if (handle->flags & UV_HANDLE_LISTENING) { - handle->connection_cb = cb; + handle->stream.serv.connection_cb = cb; } if (!(handle->flags & UV_HANDLE_BOUND)) { return WSAEINVAL; } @@ -896,17 +899,17 @@ return ERROR_NOT_SUPPORTED; } handle->flags |= UV_HANDLE_LISTENING; INCREASE_ACTIVE_COUNT(loop, handle); - handle->connection_cb = cb; + handle->stream.serv.connection_cb = cb; /* First pipe handle should have already been created in uv_pipe_bind */ - assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); + assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - for (i = 0; i < handle->pending_instances; i++) { - uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); + for (i = 0; i < handle->pipe.serv.pending_instances; i++) { + uv_pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0); } return 0; } @@ -917,22 +920,22 @@ uv_read_t* req = (uv_read_t*) parameter; uv_pipe_t* handle = (uv_pipe_t*) req->data; uv_loop_t* loop = handle->loop; HANDLE hThread = NULL; DWORD err; - uv_mutex_t *m = &handle->readfile_mutex; + uv_mutex_t *m = &handle->pipe.conn.readfile_mutex; assert(req != NULL); assert(req->type == UV_READ); assert(handle->type == UV_NAMED_PIPE); if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */ if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &hThread, 0, TRUE, DUPLICATE_SAME_ACCESS)) { - handle->readfile_thread = hThread; + handle->pipe.conn.readfile_thread = hThread; } else { hThread = NULL; } uv_mutex_unlock(m); } @@ -946,25 +949,25 @@ err = GetLastError(); if (err == ERROR_OPERATION_ABORTED && handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { if (handle->flags & UV_HANDLE_READING) { /* just a brief break to do something else */ - handle->readfile_thread = NULL; + handle->pipe.conn.readfile_thread = NULL; /* resume after it is finished */ uv_mutex_lock(m); - handle->readfile_thread = hThread; + handle->pipe.conn.readfile_thread = hThread; uv_mutex_unlock(m); goto restart_readfile; } else { result = 1; /* successfully stopped reading */ } } } if (hThread) { - assert(hThread == handle->readfile_thread); + assert(hThread == handle->pipe.conn.readfile_thread); /* mutex does not control clearing readfile_thread */ - handle->readfile_thread = NULL; + handle->pipe.conn.readfile_thread = NULL; uv_mutex_lock(m); /* only when we hold the mutex lock is it safe to open or close the handle */ CloseHandle(hThread); uv_mutex_unlock(m); @@ -1015,13 +1018,13 @@ handle = (uv_tcp_t*)req->data; assert(handle != NULL); assert(!timed_out); if (!PostQueuedCompletionStatus(handle->loop->iocp, - req->overlapped.InternalHigh, + req->u.io.overlapped.InternalHigh, 0, - &req->overlapped)) { + &req->u.io.overlapped)) { uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); } } @@ -1034,13 +1037,13 @@ handle = (uv_tcp_t*)req->handle; assert(handle != NULL); assert(!timed_out); if (!PostQueuedCompletionStatus(handle->loop->iocp, - req->overlapped.InternalHigh, + req->u.io.overlapped.InternalHigh, 0, - &req->overlapped)) { + &req->u.io.overlapped)) { uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); } } @@ -1062,21 +1065,21 @@ /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, GetLastError()); goto error; } } else { - memset(&req->overlapped, 0, sizeof(req->overlapped)); + memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); if (handle->flags & UV_HANDLE_EMULATE_IOCP) { - req->overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); + req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1); } /* Do 0-read */ result = ReadFile(handle->handle, &uv_zero_, 0, NULL, - &req->overlapped); + &req->u.io.overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, GetLastError()); goto error; @@ -1089,11 +1092,11 @@ uv_fatal_error(GetLastError(), "CreateEvent"); } } if (req->wait_handle == INVALID_HANDLE_VALUE) { if (!RegisterWaitForSingleObject(&req->wait_handle, - req->overlapped.hEvent, post_completion_read_wait, (void*) req, + req->u.io.overlapped.hEvent, post_completion_read_wait, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) { SET_REQ_ERROR(req, GetLastError()); goto error; } } @@ -1133,32 +1136,32 @@ static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, uv_write_t* req) { req->next_req = NULL; - if (handle->non_overlapped_writes_tail) { + if (handle->pipe.conn.non_overlapped_writes_tail) { req->next_req = - handle->non_overlapped_writes_tail->next_req; - handle->non_overlapped_writes_tail->next_req = (uv_req_t*)req; - handle->non_overlapped_writes_tail = req; + handle->pipe.conn.non_overlapped_writes_tail->next_req; + handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req; + handle->pipe.conn.non_overlapped_writes_tail = req; } else { req->next_req = (uv_req_t*)req; - handle->non_overlapped_writes_tail = req; + handle->pipe.conn.non_overlapped_writes_tail = req; } } static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { uv_write_t* req; - if (handle->non_overlapped_writes_tail) { - req = (uv_write_t*)handle->non_overlapped_writes_tail->next_req; + if (handle->pipe.conn.non_overlapped_writes_tail) { + req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req; - if (req == handle->non_overlapped_writes_tail) { - handle->non_overlapped_writes_tail = NULL; + if (req == handle->pipe.conn.non_overlapped_writes_tail) { + handle->pipe.conn.non_overlapped_writes_tail = NULL; } else { - handle->non_overlapped_writes_tail->next_req = + handle->pipe.conn.non_overlapped_writes_tail->next_req = req->next_req; } return req; } else { @@ -1211,21 +1214,21 @@ req->handle = (uv_stream_t*) handle; req->cb = cb; req->ipc_header = 0; req->event_handle = NULL; req->wait_handle = INVALID_HANDLE_VALUE; - memset(&req->overlapped, 0, sizeof(req->overlapped)); + memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); if (handle->ipc) { assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); ipc_frame.header.flags = 0; /* Use the IPC framing protocol. */ if (send_handle) { tcp_send_handle = (uv_tcp_t*)send_handle; - err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid, + err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid, &ipc_frame.socket_info_ex.socket_info); if (err) { return err; } @@ -1253,16 +1256,16 @@ } else { /* * Try to use the preallocated write req if it's available. * Otherwise allocate a new one. */ - if (handle->ipc_header_write_req.type != UV_WRITE) { - ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req; + if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) { + ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req; } else { - ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t)); + ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t)); if (!ipc_header_req) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); } } uv_req_init(loop, (uv_req_t*) ipc_header_req); ipc_header_req->type = UV_WRITE; @@ -1270,47 +1273,48 @@ ipc_header_req->cb = NULL; ipc_header_req->ipc_header = 1; } /* Write the header or the whole frame. */ - memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + memset(&ipc_header_req->u.io.overlapped, 0, + sizeof(ipc_header_req->u.io.overlapped)); /* Using overlapped IO, but wait for completion before returning. This write is blocking because ipc_frame is on stack. */ - ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); - if (!ipc_header_req->overlapped.hEvent) { + ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); + if (!ipc_header_req->u.io.overlapped.hEvent) { uv_fatal_error(GetLastError(), "CreateEvent"); } result = WriteFile(handle->handle, &ipc_frame, ipc_frame.header.flags & UV_IPC_TCP_SERVER ? sizeof(ipc_frame) : sizeof(ipc_frame.header), NULL, - &ipc_header_req->overlapped); + &ipc_header_req->u.io.overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { err = GetLastError(); - CloseHandle(ipc_header_req->overlapped.hEvent); + CloseHandle(ipc_header_req->u.io.overlapped.hEvent); return err; } if (!result) { /* Request not completed immediately. Wait for it.*/ - if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) != + if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) != WAIT_OBJECT_0) { err = GetLastError(); - CloseHandle(ipc_header_req->overlapped.hEvent); + CloseHandle(ipc_header_req->u.io.overlapped.hEvent); return err; } } - ipc_header_req->queued_bytes = 0; - CloseHandle(ipc_header_req->overlapped.hEvent); - ipc_header_req->overlapped.hEvent = NULL; + ipc_header_req->u.io.queued_bytes = 0; + CloseHandle(ipc_header_req->u.io.overlapped.hEvent); + ipc_header_req->u.io.overlapped.hEvent = NULL; REGISTER_HANDLE_REQ(loop, handle, ipc_header_req); handle->reqs_pending++; - handle->write_reqs_pending++; + handle->stream.conn.write_reqs_pending++; /* If we don't have any raw data to write - we're done. */ if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { return 0; } @@ -1329,103 +1333,103 @@ if (!result) { err = GetLastError(); return err; } else { /* Request completed immediately. */ - req->queued_bytes = 0; + req->u.io.queued_bytes = 0; } REGISTER_HANDLE_REQ(loop, handle, req); handle->reqs_pending++; - handle->write_reqs_pending++; + handle->stream.conn.write_reqs_pending++; POST_COMPLETION_FOR_REQ(loop, req); return 0; } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { req->write_buffer = bufs[0]; uv_insert_non_overlapped_write_req(handle, req); - if (handle->write_reqs_pending == 0) { + if (handle->stream.conn.write_reqs_pending == 0) { uv_queue_non_overlapped_write(handle); } /* Request queued by the kernel. */ - req->queued_bytes = uv__count_bufs(bufs, nbufs); - handle->write_queue_size += req->queued_bytes; + req->u.io.queued_bytes = bufs[0].len; + handle->write_queue_size += req->u.io.queued_bytes; } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) { /* Using overlapped IO, but wait for completion before returning */ - req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); - if (!req->overlapped.hEvent) { + req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); + if (!req->u.io.overlapped.hEvent) { uv_fatal_error(GetLastError(), "CreateEvent"); } result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, NULL, - &req->overlapped); + &req->u.io.overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { err = GetLastError(); - CloseHandle(req->overlapped.hEvent); + CloseHandle(req->u.io.overlapped.hEvent); return err; } if (result) { /* Request completed immediately. */ - req->queued_bytes = 0; + req->u.io.queued_bytes = 0; } else { - assert(ipc_header_req != NULL); /* Request queued by the kernel. */ - if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) != + req->u.io.queued_bytes = bufs[0].len; + handle->write_queue_size += req->u.io.queued_bytes; + if (WaitForSingleObject(req->u.io.overlapped.hEvent, INFINITE) != WAIT_OBJECT_0) { err = GetLastError(); - CloseHandle(ipc_header_req->overlapped.hEvent); + CloseHandle(req->u.io.overlapped.hEvent); return uv_translate_sys_error(err); } } - CloseHandle(req->overlapped.hEvent); + CloseHandle(req->u.io.overlapped.hEvent); REGISTER_HANDLE_REQ(loop, handle, req); handle->reqs_pending++; - handle->write_reqs_pending++; - POST_COMPLETION_FOR_REQ(loop, req); + handle->stream.conn.write_reqs_pending++; return 0; } else { result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, NULL, - &req->overlapped); + &req->u.io.overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { return GetLastError(); } if (result) { /* Request completed immediately. */ - req->queued_bytes = 0; + req->u.io.queued_bytes = 0; } else { /* Request queued by the kernel. */ - req->queued_bytes = uv__count_bufs(bufs, nbufs); - handle->write_queue_size += req->queued_bytes; + req->u.io.queued_bytes = bufs[0].len; + handle->write_queue_size += req->u.io.queued_bytes; } if (handle->flags & UV_HANDLE_EMULATE_IOCP) { req->event_handle = CreateEvent(NULL, 0, 0, NULL); if (!req->event_handle) { uv_fatal_error(GetLastError(), "CreateEvent"); } if (!RegisterWaitForSingleObject(&req->wait_handle, - req->overlapped.hEvent, post_completion_write_wait, (void*) req, + req->u.io.overlapped.hEvent, post_completion_write_wait, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) { return GetLastError(); } } } REGISTER_HANDLE_REQ(loop, handle, req); handle->reqs_pending++; - handle->write_reqs_pending++; + handle->stream.conn.write_reqs_pending++; return 0; } @@ -1492,18 +1496,18 @@ void uv__pipe_insert_pending_socket(uv_pipe_t* handle, uv__ipc_socket_info_ex* info, int tcp_connection) { uv__ipc_queue_item_t* item; - item = (uv__ipc_queue_item_t*) malloc(sizeof(*item)); + item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item)); if (item == NULL) - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex)); item->tcp_connection = tcp_connection; - QUEUE_INSERT_TAIL(&handle->pending_ipc_info.queue, &item->member); - handle->pending_ipc_info.queue_len++; + QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member); + handle->pipe.conn.pending_ipc_info.queue_len++; } void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { @@ -1542,11 +1546,11 @@ break; } if (handle->ipc) { /* Use the IPC framing protocol to read the incoming data. */ - if (handle->remaining_ipc_rawdata_bytes == 0) { + if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) { /* We're reading a new frame. First, read the header. */ assert(avail >= sizeof(ipc_frame.header)); if (!ReadFile(handle->handle, &ipc_frame.header, @@ -1585,16 +1589,16 @@ &ipc_frame.socket_info_ex, ipc_frame.header.flags & UV_IPC_TCP_CONNECTION); } if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { - handle->remaining_ipc_rawdata_bytes = + handle->pipe.conn.remaining_ipc_rawdata_bytes = ipc_frame.header.raw_data_length; continue; } } else { - avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); + avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes); } } handle->alloc_cb((uv_handle_t*) handle, avail, &buf); if (buf.len == 0) { @@ -1608,13 +1612,13 @@ buf.len, &bytes, NULL)) { /* Successful read */ if (handle->ipc) { - assert(handle->remaining_ipc_rawdata_bytes >= bytes); - handle->remaining_ipc_rawdata_bytes = - handle->remaining_ipc_rawdata_bytes - bytes; + assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes); + handle->pipe.conn.remaining_ipc_rawdata_bytes = + handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes; } handle->read_cb((uv_stream_t*)handle, bytes, &buf); /* Read again only if bytes == buf.len */ if (bytes <= buf.len) { @@ -1641,12 +1645,12 @@ uv_write_t* req) { int err; assert(handle->type == UV_NAMED_PIPE); - assert(handle->write_queue_size >= req->queued_bytes); - handle->write_queue_size -= req->queued_bytes; + assert(handle->write_queue_size >= req->u.io.queued_bytes); + handle->write_queue_size -= req->u.io.queued_bytes; UNREGISTER_HANDLE_REQ(loop, handle, req); if (handle->flags & UV_HANDLE_EMULATE_IOCP) { if (req->wait_handle != INVALID_HANDLE_VALUE) { @@ -1658,32 +1662,32 @@ req->event_handle = NULL; } } if (req->ipc_header) { - if (req == &handle->ipc_header_write_req) { + if (req == &handle->pipe.conn.ipc_header_write_req) { req->type = UV_UNKNOWN_REQ; } else { - free(req); + uv__free(req); } } else { if (req->cb) { err = GET_REQ_ERROR(req); req->cb(req, uv_translate_sys_error(err)); } } - handle->write_reqs_pending--; + handle->stream.conn.write_reqs_pending--; if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && - handle->non_overlapped_writes_tail) { - assert(handle->write_reqs_pending > 0); + handle->pipe.conn.non_overlapped_writes_tail) { + assert(handle->stream.conn.write_reqs_pending > 0); uv_queue_non_overlapped_write(handle); } - if (handle->shutdown_req != NULL && - handle->write_reqs_pending == 0) { + if (handle->stream.conn.shutdown_req != NULL && + handle->stream.conn.write_reqs_pending == 0) { uv_want_endgame(loop, (uv_handle_t*)handle); } DECREASE_PENDING_REQ_COUNT(handle); } @@ -1702,15 +1706,15 @@ return; } if (REQ_SUCCESS(req)) { assert(req->pipeHandle != INVALID_HANDLE_VALUE); - req->next_pending = handle->pending_accepts; - handle->pending_accepts = req; + req->next_pending = handle->pipe.serv.pending_accepts; + handle->pipe.serv.pending_accepts = req; - if (handle->connection_cb) { - handle->connection_cb((uv_stream_t*)handle, 0); + if (handle->stream.serv.connection_cb) { + handle->stream.serv.connection_cb((uv_stream_t*)handle, 0); } } else { if (req->pipeHandle != INVALID_HANDLE_VALUE) { CloseHandle(req->pipeHandle); req->pipeHandle = INVALID_HANDLE_VALUE; @@ -1779,36 +1783,36 @@ static void eof_timer_init(uv_pipe_t* pipe) { int r; - assert(pipe->eof_timer == NULL); + assert(pipe->pipe.conn.eof_timer == NULL); assert(pipe->flags & UV_HANDLE_CONNECTION); - pipe->eof_timer = (uv_timer_t*) malloc(sizeof *pipe->eof_timer); + pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer); - r = uv_timer_init(pipe->loop, pipe->eof_timer); + r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer); assert(r == 0); /* timers can't fail */ - pipe->eof_timer->data = pipe; - uv_unref((uv_handle_t*) pipe->eof_timer); + pipe->pipe.conn.eof_timer->data = pipe; + uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer); } static void eof_timer_start(uv_pipe_t* pipe) { assert(pipe->flags & UV_HANDLE_CONNECTION); - if (pipe->eof_timer != NULL) { - uv_timer_start(pipe->eof_timer, eof_timer_cb, eof_timeout, 0); + if (pipe->pipe.conn.eof_timer != NULL) { + uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0); } } static void eof_timer_stop(uv_pipe_t* pipe) { assert(pipe->flags & UV_HANDLE_CONNECTION); - if (pipe->eof_timer != NULL) { - uv_timer_stop(pipe->eof_timer); + if (pipe->pipe.conn.eof_timer != NULL) { + uv_timer_stop(pipe->pipe.conn.eof_timer); } } static void eof_timer_cb(uv_timer_t* timer) { @@ -1827,11 +1831,11 @@ /* If there are many packets coming off the iocp then the timer callback */ /* may be called before the read request is coming off the queue. */ /* Therefore we check here if the read request has completed but will */ /* be processed later. */ if ((pipe->flags & UV_HANDLE_READ_PENDING) && - HasOverlappedIoCompleted(&pipe->read_req.overlapped)) { + HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) { return; } /* Force both ends off the pipe. */ CloseHandle(pipe->handle); @@ -1846,22 +1850,22 @@ uv_pipe_read_eof(loop, pipe, uv_null_buf_); } static void eof_timer_destroy(uv_pipe_t* pipe) { - assert(pipe->flags && UV_HANDLE_CONNECTION); + assert(pipe->flags & UV_HANDLE_CONNECTION); - if (pipe->eof_timer) { - uv_close((uv_handle_t*) pipe->eof_timer, eof_timer_close_cb); - pipe->eof_timer = NULL; + if (pipe->pipe.conn.eof_timer) { + uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb); + pipe->pipe.conn.eof_timer = NULL; } } static void eof_timer_close_cb(uv_handle_t* handle) { assert(handle->type == UV_TIMER); - free(handle); + uv__free(handle); } int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { HANDLE os_handle = uv__get_osfhandle(file); @@ -1901,18 +1905,18 @@ uv_pipe_connection_init(pipe); if (pipe->ipc) { assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); - pipe->ipc_pid = uv_parent_pid(); - assert(pipe->ipc_pid != -1); + pipe->pipe.conn.ipc_pid = uv_parent_pid(); + assert(pipe->pipe.conn.ipc_pid != -1); } return 0; } -int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) { +static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_NAME_INFORMATION tmp_name_info; FILE_NAME_INFORMATION* name_info; WCHAR* name_buf; @@ -1922,11 +1926,11 @@ int err; name_info = NULL; if (handle->handle == INVALID_HANDLE_VALUE) { - *len = 0; + *size = 0; return UV_EINVAL; } uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */ @@ -1935,13 +1939,13 @@ &tmp_name_info, sizeof tmp_name_info, FileNameInformation); if (nt_status == STATUS_BUFFER_OVERFLOW) { name_size = sizeof(*name_info) + tmp_name_info.FileNameLength; - name_info = malloc(name_size); + name_info = uv__malloc(name_size); if (!name_info) { - *len = 0; + *size = 0; err = UV_ENOMEM; goto cleanup; } nt_status = pNtQueryInformationFile(handle->handle, @@ -1950,11 +1954,11 @@ name_size, FileNameInformation); } if (nt_status != STATUS_SUCCESS) { - *len = 0; + *size = 0; err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status)); goto error; } if (!name_info) { @@ -1965,11 +1969,11 @@ name_buf = name_info->FileName; name_len = name_info->FileNameLength; } if (name_len == 0) { - *len = 0; + *size = 0; err = 0; goto error; } name_len /= sizeof(WCHAR); @@ -1982,61 +1986,86 @@ NULL, 0, NULL, NULL); if (!addrlen) { - *len = 0; + *size = 0; err = uv_translate_sys_error(GetLastError()); goto error; - } else if (pipe_prefix_len + addrlen + 1 > *len) { - /* "\\\\.\\pipe" + name + '\0' */ - *len = pipe_prefix_len + addrlen + 1; + } else if (pipe_prefix_len + addrlen > *size) { + /* "\\\\.\\pipe" + name */ + *size = pipe_prefix_len + addrlen; err = UV_ENOBUFS; goto error; } - memcpy(buf, pipe_prefix, pipe_prefix_len); + memcpy(buffer, pipe_prefix, pipe_prefix_len); addrlen = WideCharToMultiByte(CP_UTF8, 0, name_buf, name_len, - buf+pipe_prefix_len, - *len-pipe_prefix_len, + buffer+pipe_prefix_len, + *size-pipe_prefix_len, NULL, NULL); if (!addrlen) { - *len = 0; + *size = 0; err = uv_translate_sys_error(GetLastError()); goto error; } addrlen += pipe_prefix_len; - buf[addrlen++] = '\0'; - *len = addrlen; + *size = addrlen; err = 0; goto cleanup; error: - free(name_info); + uv__free(name_info); cleanup: uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */ return err; } int uv_pipe_pending_count(uv_pipe_t* handle) { if (!handle->ipc) return 0; - return handle->pending_ipc_info.queue_len; + return handle->pipe.conn.pending_ipc_info.queue_len; } +int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) { + if (handle->flags & UV_HANDLE_BOUND) + return uv__pipe_getname(handle, buffer, size); + + if (handle->flags & UV_HANDLE_CONNECTION || + handle->handle != INVALID_HANDLE_VALUE) { + *size = 0; + return 0; + } + + return UV_EBADF; +} + + +int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) { + /* emulate unix behaviour */ + if (handle->flags & UV_HANDLE_BOUND) + return UV_ENOTCONN; + + if (handle->handle != INVALID_HANDLE_VALUE) + return uv__pipe_getname(handle, buffer, size); + + return UV_EBADF; +} + + uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) { if (!handle->ipc) return UV_UNKNOWN_HANDLE; - if (handle->pending_ipc_info.queue_len == 0) + if (handle->pipe.conn.pending_ipc_info.queue_len == 0) return UV_UNKNOWN_HANDLE; else return UV_TCP; }