ext/libuv/src/win/pipe.c in uvrb-0.1.4 vs ext/libuv/src/win/pipe.c in uvrb-0.2.0

- old
+ new

@@ -21,10 +21,11 @@ #include <assert.h> #include <io.h> #include <string.h> #include <stdio.h> +#include <stdlib.h> #include "uv.h" #include "internal.h" #include "handle-inl.h" #include "stream-inl.h" @@ -155,15 +156,14 @@ return INVALID_HANDLE_VALUE; } -uv_err_t uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, +int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, char* name, size_t nameSize) { HANDLE pipeHandle; - int errorno; - uv_err_t err; + int err; char* ptr = (char*)handle; for (;;) { uv_unique_pipe_name(ptr, name, nameSize); @@ -175,13 +175,12 @@ if (pipeHandle != INVALID_HANDLE_VALUE) { /* No name collisions. We're done. */ break; } - errorno = GetLastError(); - if (errorno != ERROR_PIPE_BUSY && errorno != ERROR_ACCESS_DENIED) { - err = uv__new_sys_error(errorno); + err = GetLastError(); + if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) { goto error; } /* Pipe name collision. Increment the pointer and try again. */ ptr++; @@ -189,18 +188,18 @@ if (CreateIoCompletionPort(pipeHandle, loop->iocp, (ULONG_PTR)handle, 0) == NULL) { - err = uv__new_sys_error(GetLastError()); + err = GetLastError(); goto error; } uv_pipe_connection_init(handle); handle->handle = pipeHandle; - return uv_ok_; + return 0; error: if (pipeHandle != INVALID_HANDLE_VALUE) { CloseHandle(pipeHandle); } @@ -276,10 +275,11 @@ return 0; } void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { + int err; DWORD result; uv_shutdown_t* req; NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; @@ -295,12 +295,11 @@ if (handle->flags & UV__HANDLE_CLOSING) { UNREGISTER_HANDLE_REQ(loop, handle, req); /* Already closing. Cancel the shutdown. */ if (req->cb) { - uv__set_artificial_error(loop, UV_ECANCELED); - req->cb(req, -1); + req->cb(req, UV_ECANCELED); } DECREASE_PENDING_REQ_COUNT(handle); return; } @@ -316,12 +315,12 @@ /* Failure */ UNREGISTER_HANDLE_REQ(loop, handle, req); handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ if (req->cb) { - uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status)); - req->cb(req, -1); + err = pRtlNtStatusToDosError(nt_status); + req->cb(req, uv_translate_sys_error(err)); } DECREASE_PENDING_REQ_COUNT(handle); return; } @@ -343,12 +342,12 @@ /* Failure. */ UNREGISTER_HANDLE_REQ(loop, handle, req); handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */ if (req->cb) { - uv__set_sys_error(loop, GetLastError()); - req->cb(req, -1); + err = GetLastError(); + req->cb(req, uv_translate_sys_error(err)); } DECREASE_PENDING_REQ_COUNT(handle); return; } @@ -394,21 +393,19 @@ /* Creates a pipe server. */ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { uv_loop_t* loop = handle->loop; - int i, errorno, nameSize; + int i, err, nameSize; uv_pipe_accept_t* req; if (handle->flags & UV_HANDLE_BOUND) { - uv__set_sys_error(loop, WSAEINVAL); - return -1; + return UV_EINVAL; } if (!name) { - uv__set_sys_error(loop, WSAEINVAL); - return -1; + return UV_EINVAL; } if (!(handle->flags & UV_HANDLE_PIPESERVER)) { handle->pending_instances = default_pending_pipe_instances; } @@ -434,12 +431,11 @@ if (!handle->name) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) { - uv__set_sys_error(loop, GetLastError()); - return -1; + return uv_translate_sys_error(GetLastError()); } /* * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. * If this fails then there's already a pipe server for the given pipe name. @@ -449,23 +445,21 @@ FILE_FLAG_FIRST_PIPE_INSTANCE, PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL); if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) { - errorno = GetLastError(); - if (errorno == ERROR_ACCESS_DENIED) { - uv__set_error(loop, UV_EADDRINUSE, errorno); - } else if (errorno == ERROR_PATH_NOT_FOUND || errorno == ERROR_INVALID_NAME) { - uv__set_error(loop, UV_EACCES, errorno); - } else { - uv__set_sys_error(loop, errorno); + err = GetLastError(); + if (err == ERROR_ACCESS_DENIED) { + err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */ + } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) { + err = WSAEACCES; /* Translates to UV_EACCES. */ } goto error; } if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) { - uv__set_sys_error(loop, GetLastError()); + err = GetLastError(); goto error; } handle->pending_accepts = NULL; handle->flags |= UV_HANDLE_PIPESERVER; @@ -482,11 +476,11 @@ if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) { CloseHandle(handle->accept_reqs[0].pipeHandle); handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE; } - return -1; + return uv_translate_sys_error(err); } static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { uv_loop_t* loop; @@ -529,11 +523,11 @@ void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, const char* name, uv_connect_cb cb) { uv_loop_t* loop = handle->loop; - int errorno, nameSize; + int err, nameSize; HANDLE pipeHandle = INVALID_HANDLE_VALUE; DWORD duplex_flags; uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; @@ -546,42 +540,42 @@ if (!handle->name) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) { - errorno = GetLastError(); + err = GetLastError(); goto error; } pipeHandle = open_named_pipe(handle->name, &duplex_flags); if (pipeHandle == INVALID_HANDLE_VALUE) { if (GetLastError() == ERROR_PIPE_BUSY) { /* Wait for the server to make a pipe instance available. */ if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) { - errorno = GetLastError(); + err = GetLastError(); goto error; } REGISTER_HANDLE_REQ(loop, handle, req); handle->reqs_pending++; return; } - errorno = GetLastError(); + err = GetLastError(); goto error; } assert(pipeHandle != INVALID_HANDLE_VALUE); if (uv_set_pipe_handle(loop, (uv_pipe_t*) req->handle, pipeHandle, duplex_flags)) { - errorno = GetLastError(); + err = GetLastError(); goto error; } SET_REQ_SUCCESS(req); uv_insert_pending_req(loop, (uv_req_t*) req); @@ -598,11 +592,11 @@ if (pipeHandle != INVALID_HANDLE_VALUE) { CloseHandle(pipeHandle); } /* Make this req pending reporting an error. */ - SET_REQ_ERROR(req, errorno); + SET_REQ_ERROR(req, err); uv_insert_pending_req(loop, (uv_req_t*) req); handle->reqs_pending++; REGISTER_HANDLE_REQ(loop, handle, req); return; } @@ -723,12 +717,11 @@ uv_pipe_accept_t* req; if (server->ipc) { if (!server->pending_ipc_info.socket_info) { /* No valid pending sockets. */ - uv__set_sys_error(loop, WSAEWOULDBLOCK); - return -1; + return WSAEWOULDBLOCK; } return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info, server->pending_ipc_info.tcp_connection); } else { @@ -738,12 +731,11 @@ /* accepted. */ req = server->pending_accepts; if (!req) { /* No valid connections found, so we error out. */ - uv__set_sys_error(loop, WSAEWOULDBLOCK); - return -1; + return WSAEWOULDBLOCK; } /* Initialize the client handle and copy the pipeHandle to the client */ uv_pipe_connection_init(pipe_client); pipe_client->handle = req->pipeHandle; @@ -771,22 +763,19 @@ if (handle->flags & UV_HANDLE_LISTENING) { handle->connection_cb = cb; } if (!(handle->flags & UV_HANDLE_BOUND)) { - uv__set_artificial_error(loop, UV_EINVAL); - return -1; + return WSAEINVAL; } if (handle->flags & UV_HANDLE_READING) { - uv__set_artificial_error(loop, UV_EISCONN); - return -1; + return WSAEISCONN; } if (!(handle->flags & UV_HANDLE_PIPESERVER)) { - uv__set_artificial_error(loop, UV_ENOTSUP); - return -1; + return ERROR_NOT_SUPPORTED; } handle->flags |= UV_HANDLE_LISTENING; INCREASE_ACTIVE_COUNT(loop, handle); handle->connection_cb = cb; @@ -1042,26 +1031,25 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { + int err; int result; uv_tcp_t* tcp_send_handle; uv_write_t* ipc_header_req; uv_ipc_frame_uv_stream ipc_frame; if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) { - uv__set_artificial_error(loop, UV_ENOTSUP); - return -1; + return ERROR_NOT_SUPPORTED; } /* Only TCP handles are supported for sharing. */ if (send_handle && ((send_handle->type != UV_TCP) || (!(send_handle->flags & UV_HANDLE_BOUND) && !(send_handle->flags & UV_HANDLE_CONNECTION)))) { - uv__set_artificial_error(loop, UV_ENOTSUP); - return -1; + return ERROR_NOT_SUPPORTED; } assert(handle->handle != INVALID_HANDLE_VALUE); uv_req_init(loop, (uv_req_t*) req); @@ -1079,13 +1067,14 @@ /* Use the IPC framing protocol. */ if (send_handle) { tcp_send_handle = (uv_tcp_t*)send_handle; - if (uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid, - &ipc_frame.socket_info)) { - return -1; + err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid, + &ipc_frame.socket_info); + if (err) { + return err; } ipc_frame.header.flags |= UV_IPC_TCP_SERVER; if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) { ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION; @@ -1126,30 +1115,41 @@ } /* Write the header or the whole frame. */ memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + /* Using overlapped IO, but wait for completion before returning. + This write is blocking because ipc_frame is on stack. */ + ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); + if (!ipc_header_req->overlapped.hEvent) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + result = WriteFile(handle->handle, &ipc_frame, ipc_frame.header.flags & UV_IPC_TCP_SERVER ? sizeof(ipc_frame) : sizeof(ipc_frame.header), NULL, &ipc_header_req->overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { - uv__set_sys_error(loop, GetLastError()); - return -1; + err = GetLastError(); + CloseHandle(ipc_header_req->overlapped.hEvent); + return err; } - if (result) { - /* Request completed immediately. */ - ipc_header_req->queued_bytes = 0; - } else { - /* Request queued by the kernel. */ - ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ? - sizeof(ipc_frame) : sizeof(ipc_frame.header); - handle->write_queue_size += ipc_header_req->queued_bytes; + if (!result) { + /* Request not completed immediately. Wait for it.*/ + if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) != + WAIT_OBJECT_0) { + err = GetLastError(); + CloseHandle(ipc_header_req->overlapped.hEvent); + return err; + } } + ipc_header_req->queued_bytes = 0; + CloseHandle(ipc_header_req->overlapped.hEvent); + ipc_header_req->overlapped.hEvent = NULL; REGISTER_HANDLE_REQ(loop, handle, ipc_header_req); handle->reqs_pending++; handle->write_reqs_pending++; @@ -1157,30 +1157,89 @@ if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { return 0; } } - if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + if ((handle->flags & + (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) == + (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) { + DWORD bytes; + result = WriteFile(handle->handle, + bufs[0].base, + bufs[0].len, + &bytes, + NULL); + + if (!result) { + return err; + } else { + /* Request completed immediately. */ + req->queued_bytes = 0; + } + + REGISTER_HANDLE_REQ(loop, handle, req); + handle->reqs_pending++; + handle->write_reqs_pending++; + POST_COMPLETION_FOR_REQ(loop, req); + return 0; + } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { req->write_buffer = bufs[0]; uv_insert_non_overlapped_write_req(handle, req); if (handle->write_reqs_pending == 0) { uv_queue_non_overlapped_write(handle); } /* Request queued by the kernel. */ req->queued_bytes = uv_count_bufs(bufs, bufcnt); handle->write_queue_size += req->queued_bytes; + } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) { + /* Using overlapped IO, but wait for completion before returning */ + req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); + if (!req->overlapped.hEvent) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + + result = WriteFile(handle->handle, + bufs[0].base, + bufs[0].len, + NULL, + &req->overlapped); + + if (!result && GetLastError() != ERROR_IO_PENDING) { + err = GetLastError(); + CloseHandle(req->overlapped.hEvent); + return err; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) != + WAIT_OBJECT_0) { + err = GetLastError(); + CloseHandle(ipc_header_req->overlapped.hEvent); + return uv_translate_sys_error(err); + } + } + CloseHandle(req->overlapped.hEvent); + + REGISTER_HANDLE_REQ(loop, handle, req); + handle->reqs_pending++; + handle->write_reqs_pending++; + POST_COMPLETION_FOR_REQ(loop, req); + return 0; } else { result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, NULL, &req->overlapped); if (!result && GetLastError() != ERROR_IO_PENDING) { - uv__set_sys_error(loop, GetLastError()); - return -1; + return GetLastError(); } if (result) { /* Request completed immediately. */ req->queued_bytes = 0; @@ -1196,12 +1255,11 @@ uv_fatal_error(GetLastError(), "CreateEvent"); } if (!RegisterWaitForSingleObject(&req->wait_handle, req->overlapped.hEvent, post_completion_write_wait, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) { - uv__set_sys_error(loop, GetLastError()); - return -1; + return GetLastError(); } } } REGISTER_HANDLE_REQ(loop, handle, req); @@ -1219,12 +1277,11 @@ int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { if (!handle->ipc) { - uv__set_artificial_error(loop, UV_EINVAL); - return -1; + return WSAEINVAL; } return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb); } @@ -1236,15 +1293,14 @@ eof_timer_destroy(handle); handle->flags &= ~UV_HANDLE_READABLE; uv_read_stop((uv_stream_t*) handle); - uv__set_artificial_error(loop, UV_EOF); if (handle->read2_cb) { - handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE); + handle->read2_cb(handle, UV_EOF, uv_null_buf_, UV_UNKNOWN_HANDLE); } else { - handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + handle->read_cb((uv_stream_t*) handle, UV_EOF, uv_null_buf_); } } static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, @@ -1253,15 +1309,17 @@ /* so discard it. */ eof_timer_destroy(handle); uv_read_stop((uv_stream_t*) handle); - uv__set_sys_error(loop, error); if (handle->read2_cb) { - handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE); + handle->read2_cb(handle, + uv_translate_sys_error(error), + buf, + UV_UNKNOWN_HANDLE); } else { - handle->read_cb((uv_stream_t*)handle, -1, buf); + handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), buf); } } static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, @@ -1370,11 +1428,19 @@ avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); } } buf = handle->alloc_cb((uv_handle_t*) handle, avail); - assert(buf.len > 0); + if (buf.len == 0) { + if (handle->read2_cb) { + handle->read2_cb(handle, UV_ENOBUFS, buf, UV_UNKNOWN_HANDLE); + } else if (handle->read_cb) { + handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, buf); + } + break; + } + assert(buf.base != NULL); if (ReadFile(handle->handle, buf.base, buf.len, &bytes, @@ -1420,10 +1486,12 @@ } void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, uv_write_t* req) { + int err; + assert(handle->type == UV_NAMED_PIPE); assert(handle->write_queue_size >= req->queued_bytes); handle->write_queue_size -= req->queued_bytes; @@ -1446,16 +1514,12 @@ } else { free(req); } } else { if (req->cb) { - if (!REQ_SUCCESS(req)) { - uv__set_sys_error(loop, GET_REQ_ERROR(req)); - ((uv_write_cb)req->cb)(req, -1); - } else { - ((uv_write_cb)req->cb)(req, 0); - } + err = GET_REQ_ERROR(req); + req->cb(req, uv_translate_sys_error(err)); } } handle->write_reqs_pending--; @@ -1502,22 +1566,24 @@ } void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, uv_connect_t* req) { + int err; + assert(handle->type == UV_NAMED_PIPE); UNREGISTER_HANDLE_REQ(loop, handle, req); if (req->cb) { + err = 0; if (REQ_SUCCESS(req)) { uv_pipe_connection_init(handle); - ((uv_connect_cb)req->cb)(req, 0); } else { - uv__set_sys_error(loop, GET_REQ_ERROR(req)); - ((uv_connect_cb)req->cb)(req, -1); + err = GET_REQ_ERROR(req); } + req->cb(req, uv_translate_sys_error(err)); } DECREASE_PENDING_REQ_COUNT(handle); } @@ -1526,20 +1592,26 @@ uv_shutdown_t* req) { assert(handle->type == UV_NAMED_PIPE); UNREGISTER_HANDLE_REQ(loop, handle, req); - /* Initialize and optionally start the eof timer. */ - /* This makes no sense if we've already seen EOF. */ if (handle->flags & UV_HANDLE_READABLE) { + /* Initialize and optionally start the eof timer. Only do this if the */ + /* pipe is readable and we haven't seen EOF come in ourselves. */ eof_timer_init(handle); /* If reading start the timer right now. */ /* Otherwise uv_pipe_queue_read will start it. */ if (handle->flags & UV_HANDLE_READ_PENDING) { eof_timer_start(handle); } + + } else { + /* This pipe is not readable. We can just close it to let the other end */ + /* know that we're done writing. */ + CloseHandle(handle->handle); + handle->handle = INVALID_HANDLE_VALUE; } if (req->cb) { req->cb(req, 0); } @@ -1637,11 +1709,10 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { HANDLE os_handle = (HANDLE)_get_osfhandle(file); if (os_handle == INVALID_HANDLE_VALUE || uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) { - uv__set_sys_error(pipe->loop, WSAEINVAL); - return -1; + return UV_EINVAL; } uv_pipe_connection_init(pipe); pipe->handle = os_handle; pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;