ext/noderb_extension/libuv/src/win/pipe.c in noderb-0.0.3 vs ext/noderb_extension/libuv/src/win/pipe.c in noderb-0.0.4
- old
+ new
@@ -54,13 +54,13 @@
int uv_pipe_init_with_handle(uv_pipe_t* handle, HANDLE pipeHandle) {
int err = uv_pipe_init(handle);
if (!err) {
- /*
+ /*
* At this point we don't know whether the pipe will be used as a client
- * or a server. So, we assume that it will be a client until
+ * or a server. So, we assume that it will be a client until
* uv_listen is called.
*/
handle->handle = pipeHandle;
handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE;
}
@@ -142,27 +142,96 @@
return 0;
}
+static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
+ int errno;
+ uv_pipe_t* handle;
+ uv_shutdown_t* req;
+
+ req = (uv_shutdown_t*) parameter;
+ assert(req);
+ handle = (uv_pipe_t*) req->handle;
+ assert(handle);
+
+ FlushFileBuffers(handle->handle);
+
+ /* Post completed */
+ if (!PostQueuedCompletionStatus(LOOP->iocp,
+ 0,
+ 0,
+ &req->overlapped)) {
+ uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
+ }
+
+ return 0;
+}
+
+
void uv_pipe_endgame(uv_pipe_t* handle) {
uv_err_t err;
int status;
unsigned int uv_alloced;
+ DWORD result;
+ uv_shutdown_t* req;
+ NTSTATUS nt_status;
+ IO_STATUS_BLOCK io_status;
+ FILE_PIPE_LOCAL_INFORMATION pipe_info;
+
if (handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
handle->write_reqs_pending == 0) {
- close_pipe(handle, &status, &err);
+ req = handle->shutdown_req;
- if (handle->shutdown_req->cb) {
- if (status == -1) {
- LOOP->last_error = err;
+ /* Try to avoid flushing the pipe buffer in the thread pool. */
+ nt_status = pNtQueryInformationFile(handle->handle,
+ &io_status,
+ &pipe_info,
+ sizeof pipe_info,
+ FilePipeLocalInformation);
+
+ if (nt_status != STATUS_SUCCESS) {
+ /* Failure */
+ handle->flags &= ~UV_HANDLE_SHUTTING;
+ if (req->cb) {
+ uv_set_sys_error(pRtlNtStatusToDosError(nt_status));
+ req->cb(req, -1);
}
- handle->shutdown_req->cb(handle->shutdown_req, status);
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
}
- handle->reqs_pending--;
+
+ if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
+ /* Short-circuit, no need to call FlushFileBuffers. */
+ handle->flags |= UV_HANDLE_SHUT;
+ if (req->cb) {
+ req->cb(req, 0);
+ }
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
+ }
+
+ /* Run FlushFileBuffers in the thhead pool. */
+ result = QueueUserWorkItem(pipe_shutdown_thread_proc,
+ req,
+ WT_EXECUTELONGFUNCTION);
+ if (result) {
+ /* Mark the handle as shut now to avoid going through this again. */
+ handle->flags |= UV_HANDLE_SHUT;
+
+ } else {
+ /* Failure. */
+ handle->flags &= ~UV_HANDLE_SHUTTING;
+ if (req->cb) {
+ uv_set_sys_error(GetLastError());
+ req->cb(req, -1);
+ }
+ DECREASE_PENDING_REQ_COUNT(handle);
+ return;
+ }
}
if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
@@ -296,10 +365,12 @@
NULL);
if (pipeHandle != INVALID_HANDLE_VALUE) {
break;
}
+
+ SwitchToThread();
}
if (pipeHandle != INVALID_HANDLE_VALUE && !uv_set_pipe_handle(handle, pipeHandle)) {
handle->handle = pipeHandle;
req->error = uv_ok_;
@@ -315,12 +386,10 @@
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
- handle->reqs_pending++;
-
return 0;
}
int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
@@ -361,10 +430,12 @@
if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) {
errno = GetLastError();
goto error;
}
+ handle->reqs_pending++;
+
return 0;
}
errno = GetLastError();
goto error;
@@ -516,11 +587,11 @@
if (handle->flags & UV_HANDLE_BIND_ERROR) {
uv_set_error(UV_EINVAL, 0);
return -1;
}
- if (!(handle->flags & UV_HANDLE_BOUND) &&
+ if (!(handle->flags & UV_HANDLE_BOUND) &&
!(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) {
uv_set_error(UV_EINVAL, 0);
return -1;
}
@@ -528,11 +599,11 @@
handle->flags & UV_HANDLE_READING) {
uv_set_error(UV_EALREADY, 0);
return -1;
}
- if (!(handle->flags & UV_HANDLE_PIPESERVER) &&
+ if (!(handle->flags & UV_HANDLE_PIPESERVER) &&
!(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) {
uv_set_error(UV_ENOTSUP, 0);
return -1;
}
@@ -716,34 +787,27 @@
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
- /* TODO: do we need to check avail > 0? */
+ if (avail == 0) {
+ /* There is nothing to read after all. */
+ break;
+ }
buf = handle->alloc_cb((uv_stream_t*)handle, avail);
assert(buf.len > 0);
if (ReadFile(handle->handle,
buf.base,
buf.len,
&bytes,
NULL)) {
- if (bytes > 0) {
- /* Successful read */
- handle->read_cb((uv_stream_t*)handle, bytes, buf);
- /* Read again only if bytes == buf.len */
- if (bytes <= buf.len) {
- break;
- }
- } else {
- /* Connection closed */
- handle->flags &= ~UV_HANDLE_READING;
- handle->flags |= UV_HANDLE_EOF;
- LOOP->last_error.code = UV_EOF;
- LOOP->last_error.sys_errno_ = ERROR_SUCCESS;
- handle->read_cb((uv_stream_t*)handle, -1, buf);
+ /* Successful read */
+ handle->read_cb((uv_stream_t*)handle, bytes, buf);
+ /* Read again only if bytes == buf.len */
+ if (bytes <= buf.len) {
break;
}
} else {
/* Ouch! serious error. */
uv_set_sys_error(GetLastError());
@@ -820,9 +884,23 @@
((uv_connect_cb)req->cb)(req, 0);
} else {
LOOP->last_error = req->error;
((uv_connect_cb)req->cb)(req, -1);
}
+ }
+
+ DECREASE_PENDING_REQ_COUNT(handle);
+}
+
+
+void uv_process_pipe_shutdown_req(uv_pipe_t* handle, uv_shutdown_t* req) {
+ assert(handle->type == UV_NAMED_PIPE);
+
+ CloseHandle(handle->handle);
+ handle->handle = INVALID_HANDLE_VALUE;
+
+ if (req->cb) {
+ req->cb(req, 0);
}
DECREASE_PENDING_REQ_COUNT(handle);
}