ext/libuv/src/win/pipe.c in uvrb-0.1.4 vs ext/libuv/src/win/pipe.c in uvrb-0.2.0
- old
+ new
@@ -21,10 +21,11 @@
#include <assert.h>
#include <io.h>
#include <string.h>
#include <stdio.h>
+#include <stdlib.h>
#include "uv.h"
#include "internal.h"
#include "handle-inl.h"
#include "stream-inl.h"
@@ -155,15 +156,14 @@
return INVALID_HANDLE_VALUE;
}
-uv_err_t uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
+int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
char* name, size_t nameSize) {
HANDLE pipeHandle;
- int errorno;
- uv_err_t err;
+ int err;
char* ptr = (char*)handle;
for (;;) {
uv_unique_pipe_name(ptr, name, nameSize);
@@ -175,13 +175,12 @@
if (pipeHandle != INVALID_HANDLE_VALUE) {
/* No name collisions. We're done. */
break;
}
- errorno = GetLastError();
- if (errorno != ERROR_PIPE_BUSY && errorno != ERROR_ACCESS_DENIED) {
- err = uv__new_sys_error(errorno);
+ err = GetLastError();
+ if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
goto error;
}
/* Pipe name collision. Increment the pointer and try again. */
ptr++;
@@ -189,18 +188,18 @@
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR)handle,
0) == NULL) {
- err = uv__new_sys_error(GetLastError());
+ err = GetLastError();
goto error;
}
uv_pipe_connection_init(handle);
handle->handle = pipeHandle;
- return uv_ok_;
+ return 0;
error:
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
}
@@ -276,10 +275,11 @@
return 0;
}
void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
+ int err;
DWORD result;
uv_shutdown_t* req;
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
@@ -295,12 +295,11 @@
if (handle->flags & UV__HANDLE_CLOSING) {
UNREGISTER_HANDLE_REQ(loop, handle, req);
/* Already closing. Cancel the shutdown. */
if (req->cb) {
- uv__set_artificial_error(loop, UV_ECANCELED);
- req->cb(req, -1);
+ req->cb(req, UV_ECANCELED);
}
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
@@ -316,12 +315,12 @@
/* Failure */
UNREGISTER_HANDLE_REQ(loop, handle, req);
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
if (req->cb) {
- uv__set_sys_error(loop, pRtlNtStatusToDosError(nt_status));
- req->cb(req, -1);
+ err = pRtlNtStatusToDosError(nt_status);
+ req->cb(req, uv_translate_sys_error(err));
}
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
@@ -343,12 +342,12 @@
/* Failure. */
UNREGISTER_HANDLE_REQ(loop, handle, req);
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable */
if (req->cb) {
- uv__set_sys_error(loop, GetLastError());
- req->cb(req, -1);
+ err = GetLastError();
+ req->cb(req, uv_translate_sys_error(err));
}
DECREASE_PENDING_REQ_COUNT(handle);
return;
}
@@ -394,21 +393,19 @@
/* Creates a pipe server. */
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
uv_loop_t* loop = handle->loop;
- int i, errorno, nameSize;
+ int i, err, nameSize;
uv_pipe_accept_t* req;
if (handle->flags & UV_HANDLE_BOUND) {
- uv__set_sys_error(loop, WSAEINVAL);
- return -1;
+ return UV_EINVAL;
}
if (!name) {
- uv__set_sys_error(loop, WSAEINVAL);
- return -1;
+ return UV_EINVAL;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
handle->pending_instances = default_pending_pipe_instances;
}
@@ -434,12 +431,11 @@
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
- uv__set_sys_error(loop, GetLastError());
- return -1;
+ return uv_translate_sys_error(GetLastError());
}
/*
* Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
* If this fails then there's already a pipe server for the given pipe name.
@@ -449,23 +445,21 @@
FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
if (handle->accept_reqs[0].pipeHandle == INVALID_HANDLE_VALUE) {
- errorno = GetLastError();
- if (errorno == ERROR_ACCESS_DENIED) {
- uv__set_error(loop, UV_EADDRINUSE, errorno);
- } else if (errorno == ERROR_PATH_NOT_FOUND || errorno == ERROR_INVALID_NAME) {
- uv__set_error(loop, UV_EACCES, errorno);
- } else {
- uv__set_sys_error(loop, errorno);
+ err = GetLastError();
+ if (err == ERROR_ACCESS_DENIED) {
+ err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
+ } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
+ err = WSAEACCES; /* Translates to UV_EACCES. */
}
goto error;
}
if (uv_set_pipe_handle(loop, handle, handle->accept_reqs[0].pipeHandle, 0)) {
- uv__set_sys_error(loop, GetLastError());
+ err = GetLastError();
goto error;
}
handle->pending_accepts = NULL;
handle->flags |= UV_HANDLE_PIPESERVER;
@@ -482,11 +476,11 @@
if (handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(handle->accept_reqs[0].pipeHandle);
handle->accept_reqs[0].pipeHandle = INVALID_HANDLE_VALUE;
}
- return -1;
+ return uv_translate_sys_error(err);
}
static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
uv_loop_t* loop;
@@ -529,11 +523,11 @@
void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
- int errorno, nameSize;
+ int err, nameSize;
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
DWORD duplex_flags;
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
@@ -546,42 +540,42 @@
if (!handle->name) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
if (!uv_utf8_to_utf16(name, handle->name, nameSize / sizeof(WCHAR))) {
- errorno = GetLastError();
+ err = GetLastError();
goto error;
}
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
if (pipeHandle == INVALID_HANDLE_VALUE) {
if (GetLastError() == ERROR_PIPE_BUSY) {
/* Wait for the server to make a pipe instance available. */
if (!QueueUserWorkItem(&pipe_connect_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
- errorno = GetLastError();
+ err = GetLastError();
goto error;
}
REGISTER_HANDLE_REQ(loop, handle, req);
handle->reqs_pending++;
return;
}
- errorno = GetLastError();
+ err = GetLastError();
goto error;
}
assert(pipeHandle != INVALID_HANDLE_VALUE);
if (uv_set_pipe_handle(loop,
(uv_pipe_t*) req->handle,
pipeHandle,
duplex_flags)) {
- errorno = GetLastError();
+ err = GetLastError();
goto error;
}
SET_REQ_SUCCESS(req);
uv_insert_pending_req(loop, (uv_req_t*) req);
@@ -598,11 +592,11 @@
if (pipeHandle != INVALID_HANDLE_VALUE) {
CloseHandle(pipeHandle);
}
/* Make this req pending reporting an error. */
- SET_REQ_ERROR(req, errorno);
+ SET_REQ_ERROR(req, err);
uv_insert_pending_req(loop, (uv_req_t*) req);
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
return;
}
@@ -723,12 +717,11 @@
uv_pipe_accept_t* req;
if (server->ipc) {
if (!server->pending_ipc_info.socket_info) {
/* No valid pending sockets. */
- uv__set_sys_error(loop, WSAEWOULDBLOCK);
- return -1;
+ return WSAEWOULDBLOCK;
}
return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
server->pending_ipc_info.tcp_connection);
} else {
@@ -738,12 +731,11 @@
/* accepted. */
req = server->pending_accepts;
if (!req) {
/* No valid connections found, so we error out. */
- uv__set_sys_error(loop, WSAEWOULDBLOCK);
- return -1;
+ return WSAEWOULDBLOCK;
}
/* Initialize the client handle and copy the pipeHandle to the client */
uv_pipe_connection_init(pipe_client);
pipe_client->handle = req->pipeHandle;
@@ -771,22 +763,19 @@
if (handle->flags & UV_HANDLE_LISTENING) {
handle->connection_cb = cb;
}
if (!(handle->flags & UV_HANDLE_BOUND)) {
- uv__set_artificial_error(loop, UV_EINVAL);
- return -1;
+ return WSAEINVAL;
}
if (handle->flags & UV_HANDLE_READING) {
- uv__set_artificial_error(loop, UV_EISCONN);
- return -1;
+ return WSAEISCONN;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
- uv__set_artificial_error(loop, UV_ENOTSUP);
- return -1;
+ return ERROR_NOT_SUPPORTED;
}
handle->flags |= UV_HANDLE_LISTENING;
INCREASE_ACTIVE_COUNT(loop, handle);
handle->connection_cb = cb;
@@ -1042,26 +1031,25 @@
static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
uv_stream_t* send_handle, uv_write_cb cb) {
+ int err;
int result;
uv_tcp_t* tcp_send_handle;
uv_write_t* ipc_header_req;
uv_ipc_frame_uv_stream ipc_frame;
if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) {
- uv__set_artificial_error(loop, UV_ENOTSUP);
- return -1;
+ return ERROR_NOT_SUPPORTED;
}
/* Only TCP handles are supported for sharing. */
if (send_handle && ((send_handle->type != UV_TCP) ||
(!(send_handle->flags & UV_HANDLE_BOUND) &&
!(send_handle->flags & UV_HANDLE_CONNECTION)))) {
- uv__set_artificial_error(loop, UV_ENOTSUP);
- return -1;
+ return ERROR_NOT_SUPPORTED;
}
assert(handle->handle != INVALID_HANDLE_VALUE);
uv_req_init(loop, (uv_req_t*) req);
@@ -1079,13 +1067,14 @@
/* Use the IPC framing protocol. */
if (send_handle) {
tcp_send_handle = (uv_tcp_t*)send_handle;
- if (uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
- &ipc_frame.socket_info)) {
- return -1;
+ err = uv_tcp_duplicate_socket(tcp_send_handle, handle->ipc_pid,
+ &ipc_frame.socket_info);
+ if (err) {
+ return err;
}
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
@@ -1126,30 +1115,41 @@
}
/* Write the header or the whole frame. */
memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
+ /* Using overlapped IO, but wait for completion before returning.
+ This write is blocking because ipc_frame is on stack. */
+ ipc_header_req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
+ if (!ipc_header_req->overlapped.hEvent) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+
result = WriteFile(handle->handle,
&ipc_frame,
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
sizeof(ipc_frame) : sizeof(ipc_frame.header),
NULL,
&ipc_header_req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
- uv__set_sys_error(loop, GetLastError());
- return -1;
+ err = GetLastError();
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ return err;
}
- if (result) {
- /* Request completed immediately. */
- ipc_header_req->queued_bytes = 0;
- } else {
- /* Request queued by the kernel. */
- ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
- sizeof(ipc_frame) : sizeof(ipc_frame.header);
- handle->write_queue_size += ipc_header_req->queued_bytes;
+ if (!result) {
+ /* Request not completed immediately. Wait for it.*/
+ if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
+ WAIT_OBJECT_0) {
+ err = GetLastError();
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ return err;
+ }
}
+ ipc_header_req->queued_bytes = 0;
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ ipc_header_req->overlapped.hEvent = NULL;
REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
handle->reqs_pending++;
handle->write_reqs_pending++;
@@ -1157,30 +1157,89 @@
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
return 0;
}
}
- if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
+ if ((handle->flags &
+ (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
+ (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
+ DWORD bytes;
+ result = WriteFile(handle->handle,
+ bufs[0].base,
+ bufs[0].len,
+ &bytes,
+ NULL);
+
+ if (!result) {
+ return err;
+ } else {
+ /* Request completed immediately. */
+ req->queued_bytes = 0;
+ }
+
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+ POST_COMPLETION_FOR_REQ(loop, req);
+ return 0;
+ } else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = bufs[0];
uv_insert_non_overlapped_write_req(handle, req);
if (handle->write_reqs_pending == 0) {
uv_queue_non_overlapped_write(handle);
}
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
+ } else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
+ /* Using overlapped IO, but wait for completion before returning */
+ req->overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
+ if (!req->overlapped.hEvent) {
+ uv_fatal_error(GetLastError(), "CreateEvent");
+ }
+
+ result = WriteFile(handle->handle,
+ bufs[0].base,
+ bufs[0].len,
+ NULL,
+ &req->overlapped);
+
+ if (!result && GetLastError() != ERROR_IO_PENDING) {
+ err = GetLastError();
+ CloseHandle(req->overlapped.hEvent);
+ return err;
+ }
+
+ if (result) {
+ /* Request completed immediately. */
+ req->queued_bytes = 0;
+ } else {
+ /* Request queued by the kernel. */
+ if (WaitForSingleObject(ipc_header_req->overlapped.hEvent, INFINITE) !=
+ WAIT_OBJECT_0) {
+ err = GetLastError();
+ CloseHandle(ipc_header_req->overlapped.hEvent);
+ return uv_translate_sys_error(err);
+ }
+ }
+ CloseHandle(req->overlapped.hEvent);
+
+ REGISTER_HANDLE_REQ(loop, handle, req);
+ handle->reqs_pending++;
+ handle->write_reqs_pending++;
+ POST_COMPLETION_FOR_REQ(loop, req);
+ return 0;
} else {
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
- uv__set_sys_error(loop, GetLastError());
- return -1;
+ return GetLastError();
}
if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
@@ -1196,12 +1255,11 @@
uv_fatal_error(GetLastError(), "CreateEvent");
}
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->overlapped.hEvent, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
- uv__set_sys_error(loop, GetLastError());
- return -1;
+ return GetLastError();
}
}
}
REGISTER_HANDLE_REQ(loop, handle, req);
@@ -1219,12 +1277,11 @@
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) {
if (!handle->ipc) {
- uv__set_artificial_error(loop, UV_EINVAL);
- return -1;
+ return WSAEINVAL;
}
return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb);
}
@@ -1236,15 +1293,14 @@
eof_timer_destroy(handle);
handle->flags &= ~UV_HANDLE_READABLE;
uv_read_stop((uv_stream_t*) handle);
- uv__set_artificial_error(loop, UV_EOF);
if (handle->read2_cb) {
- handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE);
+ handle->read2_cb(handle, UV_EOF, uv_null_buf_, UV_UNKNOWN_HANDLE);
} else {
- handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_);
+ handle->read_cb((uv_stream_t*) handle, UV_EOF, uv_null_buf_);
}
}
static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
@@ -1253,15 +1309,17 @@
/* so discard it. */
eof_timer_destroy(handle);
uv_read_stop((uv_stream_t*) handle);
- uv__set_sys_error(loop, error);
if (handle->read2_cb) {
- handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE);
+ handle->read2_cb(handle,
+ uv_translate_sys_error(error),
+ buf,
+ UV_UNKNOWN_HANDLE);
} else {
- handle->read_cb((uv_stream_t*)handle, -1, buf);
+ handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), buf);
}
}
static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
@@ -1370,11 +1428,19 @@
avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
}
}
buf = handle->alloc_cb((uv_handle_t*) handle, avail);
- assert(buf.len > 0);
+ if (buf.len == 0) {
+ if (handle->read2_cb) {
+ handle->read2_cb(handle, UV_ENOBUFS, buf, UV_UNKNOWN_HANDLE);
+ } else if (handle->read_cb) {
+ handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, buf);
+ }
+ break;
+ }
+ assert(buf.base != NULL);
if (ReadFile(handle->handle,
buf.base,
buf.len,
&bytes,
@@ -1420,10 +1486,12 @@
}
void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_write_t* req) {
+ int err;
+
assert(handle->type == UV_NAMED_PIPE);
assert(handle->write_queue_size >= req->queued_bytes);
handle->write_queue_size -= req->queued_bytes;
@@ -1446,16 +1514,12 @@
} else {
free(req);
}
} else {
if (req->cb) {
- if (!REQ_SUCCESS(req)) {
- uv__set_sys_error(loop, GET_REQ_ERROR(req));
- ((uv_write_cb)req->cb)(req, -1);
- } else {
- ((uv_write_cb)req->cb)(req, 0);
- }
+ err = GET_REQ_ERROR(req);
+ req->cb(req, uv_translate_sys_error(err));
}
}
handle->write_reqs_pending--;
@@ -1502,22 +1566,24 @@
}
void uv_process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_connect_t* req) {
+ int err;
+
assert(handle->type == UV_NAMED_PIPE);
UNREGISTER_HANDLE_REQ(loop, handle, req);
if (req->cb) {
+ err = 0;
if (REQ_SUCCESS(req)) {
uv_pipe_connection_init(handle);
- ((uv_connect_cb)req->cb)(req, 0);
} else {
- uv__set_sys_error(loop, GET_REQ_ERROR(req));
- ((uv_connect_cb)req->cb)(req, -1);
+ err = GET_REQ_ERROR(req);
}
+ req->cb(req, uv_translate_sys_error(err));
}
DECREASE_PENDING_REQ_COUNT(handle);
}
@@ -1526,20 +1592,26 @@
uv_shutdown_t* req) {
assert(handle->type == UV_NAMED_PIPE);
UNREGISTER_HANDLE_REQ(loop, handle, req);
- /* Initialize and optionally start the eof timer. */
- /* This makes no sense if we've already seen EOF. */
if (handle->flags & UV_HANDLE_READABLE) {
+ /* Initialize and optionally start the eof timer. Only do this if the */
+ /* pipe is readable and we haven't seen EOF come in ourselves. */
eof_timer_init(handle);
/* If reading start the timer right now. */
/* Otherwise uv_pipe_queue_read will start it. */
if (handle->flags & UV_HANDLE_READ_PENDING) {
eof_timer_start(handle);
}
+
+ } else {
+ /* This pipe is not readable. We can just close it to let the other end */
+ /* know that we're done writing. */
+ CloseHandle(handle->handle);
+ handle->handle = INVALID_HANDLE_VALUE;
}
if (req->cb) {
req->cb(req, 0);
}
@@ -1637,11 +1709,10 @@
int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
HANDLE os_handle = (HANDLE)_get_osfhandle(file);
if (os_handle == INVALID_HANDLE_VALUE ||
uv_set_pipe_handle(pipe->loop, pipe, os_handle, 0) == -1) {
- uv__set_sys_error(pipe->loop, WSAEINVAL);
- return -1;
+ return UV_EINVAL;
}
uv_pipe_connection_init(pipe);
pipe->handle = os_handle;
pipe->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;