ext/libuv/src/win/pipe.c in libuv-1.0.3 vs ext/libuv/src/win/pipe.c in libuv-1.1.0

- old
+ new

@@ -99,10 +99,11 @@ handle->remaining_ipc_rawdata_bytes = 0; QUEUE_INIT(&handle->pending_ipc_info.queue); handle->pending_ipc_info.queue_len = 0; handle->ipc = ipc; handle->non_overlapped_writes_tail = NULL; + handle->readfile_thread = NULL; uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); return 0; } @@ -110,10 +111,16 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); handle->read_req.data = handle; handle->eof_timer = NULL; + assert(!(handle->flags & UV_HANDLE_PIPESERVER)); + if (pCancelSynchronousIo && + handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + uv_mutex_init(&handle->readfile_mutex); + handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE; + } } static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) { HANDLE pipeHandle; @@ -319,10 +326,15 @@ NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; uv__ipc_queue_item_t* item; + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE; + uv_mutex_destroy(&handle->readfile_mutex); + } + if ((handle->flags & UV_HANDLE_CONNECTION) && handle->shutdown_req != NULL && handle->write_reqs_pending == 0) { req = handle->shutdown_req; @@ -656,16 +668,53 @@ REGISTER_HANDLE_REQ(loop, handle, req); return; } +void uv__pipe_pause_read(uv_pipe_t* handle) { + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + /* Pause the ReadFile task briefly, to work + around the Windows kernel bug that causes + any access to a NamedPipe to deadlock if + any process has called ReadFile */ + HANDLE h; + uv_mutex_lock(&handle->readfile_mutex); + h = handle->readfile_thread; + while (h) { + /* spinlock: we expect this to finish quickly, + or we are probably about to deadlock anyways + (in the kernel), so it doesn't matter */ + pCancelSynchronousIo(h); + SwitchToThread(); /* yield thread control briefly */ + h = handle->readfile_thread; + } + } +} + + +void uv__pipe_unpause_read(uv_pipe_t* handle) { + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + uv_mutex_unlock(&handle->readfile_mutex); + } +} + + +void uv__pipe_stop_read(uv_pipe_t* handle) { + handle->flags &= ~UV_HANDLE_READING; + uv__pipe_pause_read((uv_pipe_t*)handle); + uv__pipe_unpause_read((uv_pipe_t*)handle); +} + + /* Cleans up uv_pipe_t (server or connection) and all resources associated */ /* with it. */ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { int i; HANDLE pipeHandle; + uv__pipe_stop_read(handle); + if (handle->name) { free(handle->name); handle->name = NULL; } @@ -687,10 +736,11 @@ if ((handle->flags & UV_HANDLE_CONNECTION) && handle->handle != INVALID_HANDLE_VALUE) { CloseHandle(handle->handle); handle->handle = INVALID_HANDLE_VALUE; } + } void uv_pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { if (handle->flags & UV_HANDLE_READING) { @@ -865,23 +915,65 @@ int result; DWORD bytes; uv_read_t* req = (uv_read_t*) parameter; uv_pipe_t* handle = (uv_pipe_t*) req->data; uv_loop_t* loop = handle->loop; + HANDLE hThread = NULL; + DWORD err; + uv_mutex_t *m = &handle->readfile_mutex; assert(req != NULL); assert(req->type == UV_READ); assert(handle->type == UV_NAMED_PIPE); + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */ + if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &hThread, + 0, TRUE, DUPLICATE_SAME_ACCESS)) { + handle->readfile_thread = hThread; + } else { + hThread = NULL; + } + uv_mutex_unlock(m); + } +restart_readfile: result = ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL); + if (!result) { + err = GetLastError(); + if (err == ERROR_OPERATION_ABORTED && + handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + if (handle->flags & UV_HANDLE_READING) { + /* just a brief break to do something else */ + handle->readfile_thread = NULL; + /* resume after it is finished */ + uv_mutex_lock(m); + handle->readfile_thread = hThread; + uv_mutex_unlock(m); + goto restart_readfile; + } else { + result = 1; /* successfully stopped reading */ + } + } + } + if (hThread) { + assert(hThread == handle->readfile_thread); + /* mutex does not control clearing readfile_thread */ + handle->readfile_thread = NULL; + uv_mutex_lock(m); + /* only when we hold the mutex lock is it safe to + open or close the handle */ + CloseHandle(hThread); + uv_mutex_unlock(m); + } if (!result) { - SET_REQ_ERROR(req, GetLastError()); + SET_REQ_ERROR(req, err); } POST_COMPLETION_FOR_REQ(loop, req); return 0; } @@ -1834,21 +1926,24 @@ if (handle->handle == INVALID_HANDLE_VALUE) { *len = 0; return UV_EINVAL; } + uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */ + nt_status = pNtQueryInformationFile(handle->handle, &io_status, &tmp_name_info, sizeof tmp_name_info, FileNameInformation); if (nt_status == STATUS_BUFFER_OVERFLOW) { name_size = sizeof(*name_info) + tmp_name_info.FileNameLength; name_info = malloc(name_size); if (!name_info) { *len = 0; - return UV_ENOMEM; + err = UV_ENOMEM; + goto cleanup; } nt_status = pNtQueryInformationFile(handle->handle, &io_status, name_info, @@ -1916,13 +2011,17 @@ addrlen += pipe_prefix_len; buf[addrlen++] = '\0'; *len = addrlen; - return 0; + err = 0; + goto cleanup; error: free(name_info); + +cleanup: + uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */ return err; } int uv_pipe_pending_count(uv_pipe_t* handle) {