ext/noderb_extension/libuv/src/win/pipe.c in noderb-0.0.3 vs ext/noderb_extension/libuv/src/win/pipe.c in noderb-0.0.4

- old
+ new

@@ -54,13 +54,13 @@ int uv_pipe_init_with_handle(uv_pipe_t* handle, HANDLE pipeHandle) { int err = uv_pipe_init(handle); if (!err) { - /* + /* * At this point we don't know whether the pipe will be used as a client - * or a server. So, we assume that it will be a client until + * or a server. So, we assume that it will be a client until * uv_listen is called. */ handle->handle = pipeHandle; handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; } @@ -142,27 +142,96 @@ return 0; } +static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) { + int errno; + uv_pipe_t* handle; + uv_shutdown_t* req; + + req = (uv_shutdown_t*) parameter; + assert(req); + handle = (uv_pipe_t*) req->handle; + assert(handle); + + FlushFileBuffers(handle->handle); + + /* Post completed */ + if (!PostQueuedCompletionStatus(LOOP->iocp, + 0, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } + + return 0; +} + + void uv_pipe_endgame(uv_pipe_t* handle) { uv_err_t err; int status; unsigned int uv_alloced; + DWORD result; + uv_shutdown_t* req; + NTSTATUS nt_status; + IO_STATUS_BLOCK io_status; + FILE_PIPE_LOCAL_INFORMATION pipe_info; + if (handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && handle->write_reqs_pending == 0) { - close_pipe(handle, &status, &err); + req = handle->shutdown_req; - if (handle->shutdown_req->cb) { - if (status == -1) { - LOOP->last_error = err; + /* Try to avoid flushing the pipe buffer in the thread pool. */ + nt_status = pNtQueryInformationFile(handle->handle, + &io_status, + &pipe_info, + sizeof pipe_info, + FilePipeLocalInformation); + + if (nt_status != STATUS_SUCCESS) { + /* Failure */ + handle->flags &= ~UV_HANDLE_SHUTTING; + if (req->cb) { + uv_set_sys_error(pRtlNtStatusToDosError(nt_status)); + req->cb(req, -1); } - handle->shutdown_req->cb(handle->shutdown_req, status); + DECREASE_PENDING_REQ_COUNT(handle); + return; } - handle->reqs_pending--; + + if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) { + /* Short-circuit, no need to call FlushFileBuffers. */ + handle->flags |= UV_HANDLE_SHUT; + if (req->cb) { + req->cb(req, 0); + } + DECREASE_PENDING_REQ_COUNT(handle); + return; + } + + /* Run FlushFileBuffers in the thhead pool. */ + result = QueueUserWorkItem(pipe_shutdown_thread_proc, + req, + WT_EXECUTELONGFUNCTION); + if (result) { + /* Mark the handle as shut now to avoid going through this again. */ + handle->flags |= UV_HANDLE_SHUT; + + } else { + /* Failure. */ + handle->flags &= ~UV_HANDLE_SHUTTING; + if (req->cb) { + uv_set_sys_error(GetLastError()); + req->cb(req, -1); + } + DECREASE_PENDING_REQ_COUNT(handle); + return; + } } if (handle->flags & UV_HANDLE_CLOSING && handle->reqs_pending == 0) { assert(!(handle->flags & UV_HANDLE_CLOSED)); @@ -296,10 +365,12 @@ NULL); if (pipeHandle != INVALID_HANDLE_VALUE) { break; } + + SwitchToThread(); } if (pipeHandle != INVALID_HANDLE_VALUE && !uv_set_pipe_handle(handle, pipeHandle)) { handle->handle = pipeHandle; req->error = uv_ok_; @@ -315,12 +386,10 @@ 0, &req->overlapped)) { uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); } - handle->reqs_pending++; - return 0; } int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, @@ -361,10 +430,12 @@ if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) { errno = GetLastError(); goto error; } + handle->reqs_pending++; + return 0; } errno = GetLastError(); goto error; @@ -516,11 +587,11 @@ if (handle->flags & UV_HANDLE_BIND_ERROR) { uv_set_error(UV_EINVAL, 0); return -1; } - if (!(handle->flags & UV_HANDLE_BOUND) && + if (!(handle->flags & UV_HANDLE_BOUND) && !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { uv_set_error(UV_EINVAL, 0); return -1; } @@ -528,11 +599,11 @@ handle->flags & UV_HANDLE_READING) { uv_set_error(UV_EALREADY, 0); return -1; } - if (!(handle->flags & UV_HANDLE_PIPESERVER) && + if (!(handle->flags & UV_HANDLE_PIPESERVER) && !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { uv_set_error(UV_ENOTSUP, 0); return -1; } @@ -716,34 +787,27 @@ buf.len = 0; handle->read_cb((uv_stream_t*)handle, -1, buf); break; } - /* TODO: do we need to check avail > 0? */ + if (avail == 0) { + /* There is nothing to read after all. */ + break; + } buf = handle->alloc_cb((uv_stream_t*)handle, avail); assert(buf.len > 0); if (ReadFile(handle->handle, buf.base, buf.len, &bytes, NULL)) { - if (bytes > 0) { - /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); - /* Read again only if bytes == buf.len */ - if (bytes <= buf.len) { - break; - } - } else { - /* Connection closed */ - handle->flags &= ~UV_HANDLE_READING; - handle->flags |= UV_HANDLE_EOF; - LOOP->last_error.code = UV_EOF; - LOOP->last_error.sys_errno_ = ERROR_SUCCESS; - handle->read_cb((uv_stream_t*)handle, -1, buf); + /* Successful read */ + handle->read_cb((uv_stream_t*)handle, bytes, buf); + /* Read again only if bytes == buf.len */ + if (bytes <= buf.len) { break; } } else { /* Ouch! serious error. */ uv_set_sys_error(GetLastError()); @@ -820,9 +884,23 @@ ((uv_connect_cb)req->cb)(req, 0); } else { LOOP->last_error = req->error; ((uv_connect_cb)req->cb)(req, -1); } + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +void uv_process_pipe_shutdown_req(uv_pipe_t* handle, uv_shutdown_t* req) { + assert(handle->type == UV_NAMED_PIPE); + + CloseHandle(handle->handle); + handle->handle = INVALID_HANDLE_VALUE; + + if (req->cb) { + req->cb(req, 0); } DECREASE_PENDING_REQ_COUNT(handle); }