ext/asyncengine/libuv/src/win/tcp.c in asyncengine-0.0.1.testing1 vs ext/asyncengine/libuv/src/win/tcp.c in asyncengine-0.0.2.alpha1

- old
+ new

@@ -20,12 +20,14 @@ */ #include <assert.h> #include "uv.h" -#include "../uv-common.h" #include "internal.h" +#include "handle-inl.h" +#include "stream-inl.h" +#include "req-inl.h" /* * Threshold of active tcp streams for which to preallocate tcp read buffers. * (Due to node slab allocator performing poorly under this pattern, @@ -89,16 +91,10 @@ if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) { uv__set_sys_error(loop, WSAGetLastError()); return -1; } - /* Make the socket non-inheritable */ - if (!SetHandleInformation((HANDLE)socket, HANDLE_FLAG_INHERIT, 0)) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } - /* Associate it with the I/O completion port. */ /* Use uv_handle_t pointer as completion key. */ if (CreateIoCompletionPort((HANDLE)socket, loop->iocp, (ULONG_PTR)socket, @@ -141,16 +137,15 @@ return 0; } int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { - uv_stream_init(loop, (uv_stream_t*)handle); + uv_stream_init(loop, (uv_stream_t*) handle, UV_TCP); handle->accept_reqs = NULL; handle->pending_accepts = NULL; handle->socket = INVALID_SOCKET; - handle->type = UV_TCP; handle->reqs_pending = 0; handle->func_acceptex = NULL; handle->func_connectex = NULL; handle->processed_accepts = 0; @@ -168,38 +163,39 @@ if (handle->flags & UV_HANDLE_CONNECTION && handle->shutdown_req != NULL && handle->write_reqs_pending == 0) { + UNREGISTER_HANDLE_REQ(loop, handle, handle->shutdown_req); + if (handle->flags & UV_HANDLE_CLOSING) { status = -1; sys_error = WSAEINTR; } else if (shutdown(handle->socket, SD_SEND) != SOCKET_ERROR) { status = 0; handle->flags |= UV_HANDLE_SHUT; } else { status = -1; sys_error = WSAGetLastError(); } + if (handle->shutdown_req->cb) { if (status == -1) { uv__set_sys_error(loop, sys_error); } handle->shutdown_req->cb(handle->shutdown_req, status); } handle->shutdown_req = NULL; - - uv_unref(loop); DECREASE_PENDING_REQ_COUNT(handle); return; } if (handle->flags & UV_HANDLE_CLOSING && handle->reqs_pending == 0) { assert(!(handle->flags & UV_HANDLE_CLOSED)); - handle->flags |= UV_HANDLE_CLOSED; + uv__handle_stop(handle); if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) { closesocket(handle->socket); handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED; } @@ -233,17 +229,12 @@ CloseHandle(handle->read_req.event_handle); handle->read_req.event_handle = NULL; } } - if (handle->close_cb) { - handle->close_cb((uv_handle_t*)handle); - } - + uv__handle_close(handle); loop->active_tcp_streams--; - - uv_unref(loop); } } static int uv__bind(uv_tcp_t* handle, @@ -258,10 +249,17 @@ if (sock == INVALID_SOCKET) { uv__set_sys_error(handle->loop, WSAGetLastError()); return -1; } + /* Make the socket non-inheritable */ + if (!SetHandleInformation((HANDLE) sock, HANDLE_FLAG_INHERIT, 0)) { + uv__set_sys_error(handle->loop, GetLastError()); + closesocket(sock); + return -1; + } + if (uv_tcp_set_socket(handle->loop, handle, sock, 0) == -1) { closesocket(sock); return -1; } } @@ -308,14 +306,14 @@ } } static void CALLBACK post_completion(void* context, BOOLEAN timed_out) { - uv_tcp_accept_t* req; + uv_req_t* req; uv_tcp_t* handle; - req = (uv_tcp_accept_t*) context; + req = (uv_req_t*) context; assert(req != NULL); handle = (uv_tcp_t*)req->data; assert(handle != NULL); assert(!timed_out); @@ -326,10 +324,29 @@ uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); } } +static void CALLBACK post_write_completion(void* context, BOOLEAN timed_out) { + uv_write_t* req; + uv_tcp_t* handle; + + req = (uv_write_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->handle; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { uv_loop_t* loop = handle->loop; BOOL success; DWORD bytes; SOCKET accept_socket; @@ -352,10 +369,19 @@ uv_insert_pending_req(loop, (uv_req_t*)req); handle->reqs_pending++; return; } + /* Make the socket non-inheritable */ + if (!SetHandleInformation((HANDLE) accept_socket, HANDLE_FLAG_INHERIT, 0)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + handle->reqs_pending++; + closesocket(accept_socket); + return; + } + /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); if (handle->flags & UV_HANDLE_EMULATE_IOCP) { req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); } @@ -379,11 +405,11 @@ req->accept_socket = accept_socket; handle->reqs_pending++; if (handle->flags & UV_HANDLE_EMULATE_IOCP && req->wait_handle == INVALID_HANDLE_VALUE && !RegisterWaitForSingleObject(&req->wait_handle, - req->overlapped.hEvent, post_completion, (void*) req, + req->event_handle, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) { SET_REQ_ERROR(req, GetLastError()); uv_insert_pending_req(loop, (uv_req_t*)req); handle->reqs_pending++; return; @@ -458,11 +484,11 @@ handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; if (handle->flags & UV_HANDLE_EMULATE_IOCP && req->wait_handle == INVALID_HANDLE_VALUE && !RegisterWaitForSingleObject(&req->wait_handle, - req->overlapped.hEvent, post_completion, (void*) req, + req->event_handle, post_completion, (void*) req, INFINITE, WT_EXECUTEINWAITTHREAD)) { SET_REQ_ERROR(req, GetLastError()); uv_insert_pending_req(loop, (uv_req_t*)req); } } else { @@ -479,10 +505,19 @@ unsigned int i, simultaneous_accepts; uv_tcp_accept_t* req; assert(backlog > 0); + if (handle->flags & UV_HANDLE_LISTENING) { + handle->connection_cb = cb; + } + + if (handle->flags & UV_HANDLE_READING) { + uv__set_artificial_error(loop, UV_EISCONN); + return -1; + } + if (handle->flags & UV_HANDLE_BIND_ERROR) { uv__set_sys_error(loop, handle->bind_error); return -1; } @@ -503,10 +538,11 @@ return -1; } handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; + INCREASE_ACTIVE_COUNT(loop, handle); simultaneous_accepts = handle->flags & UV_HANDLE_TCP_SINGLE_ACCEPT ? 1 : uv_simultaneous_server_accepts; if(!handle->accept_reqs) { @@ -621,10 +657,11 @@ } handle->flags |= UV_HANDLE_READING; handle->read_cb = read_cb; handle->alloc_cb = alloc_cb; + INCREASE_ACTIVE_COUNT(loop, handle); /* If reading was stopped and then started again, there could still be a */ /* read request pending. */ if (!(handle->flags & UV_HANDLE_READ_PENDING)) { if (handle->flags & UV_HANDLE_EMULATE_IOCP && @@ -681,16 +718,16 @@ &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ handle->reqs_pending++; - uv_ref(loop); + REGISTER_HANDLE_REQ(loop, handle, req); uv_insert_pending_req(loop, (uv_req_t*)req); } else if (UV_SUCCEEDED_WITH_IOCP(success)) { /* The req will be processed with IOCP. */ handle->reqs_pending++; - uv_ref(loop); + REGISTER_HANDLE_REQ(loop, handle, req); } else { uv__set_sys_error(loop, WSAGetLastError()); return -1; } @@ -742,15 +779,15 @@ &bytes, &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { handle->reqs_pending++; - uv_ref(loop); + REGISTER_HANDLE_REQ(loop, handle, req); uv_insert_pending_req(loop, (uv_req_t*)req); } else if (UV_SUCCEEDED_WITH_IOCP(success)) { handle->reqs_pending++; - uv_ref(loop); + REGISTER_HANDLE_REQ(loop, handle, req); } else { uv__set_sys_error(loop, WSAGetLastError()); return -1; } @@ -835,10 +872,11 @@ req->event_handle = CreateEvent(NULL, 0, 0, NULL); if (!req->event_handle) { uv_fatal_error(GetLastError(), "CreateEvent"); } req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); + req->wait_handle = INVALID_HANDLE_VALUE; } result = WSASend(handle->socket, (WSABUF*)bufs, bufcnt, @@ -850,24 +888,23 @@ if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { /* Request completed immediately. */ req->queued_bytes = 0; handle->reqs_pending++; handle->write_reqs_pending++; + REGISTER_HANDLE_REQ(loop, handle, req); uv_insert_pending_req(loop, (uv_req_t*) req); - uv_ref(loop); } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { /* Request queued by the kernel. */ req->queued_bytes = uv_count_bufs(bufs, bufcnt); handle->reqs_pending++; handle->write_reqs_pending++; + REGISTER_HANDLE_REQ(loop, handle, req); handle->write_queue_size += req->queued_bytes; - uv_ref(loop); if (handle->flags & UV_HANDLE_EMULATE_IOCP && - req->wait_handle == INVALID_HANDLE_VALUE && !RegisterWaitForSingleObject(&req->wait_handle, - req->overlapped.hEvent, post_completion, (void*) req, - INFINITE, WT_EXECUTEINWAITTHREAD)) { + req->event_handle, post_write_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE)) { SET_REQ_ERROR(req, GetLastError()); uv_insert_pending_req(loop, (uv_req_t*)req); } } else { /* Send failed due to an error. */ @@ -891,10 +928,11 @@ if (!REQ_SUCCESS(req)) { /* An error occurred doing the read. */ if ((handle->flags & UV_HANDLE_READING) || !(handle->flags & UV_HANDLE_ZERO_READ)) { handle->flags &= ~UV_HANDLE_READING; + DECREASE_ACTIVE_COUNT(loop, handle); buf = (handle->flags & UV_HANDLE_ZERO_READ) ? uv_buf_init(NULL, 0) : handle->read_buffer; err = GET_REQ_SOCK_ERROR(req); @@ -921,12 +959,16 @@ if (req->overlapped.InternalHigh < handle->read_buffer.len) { goto done; } } else { /* Connection closed */ - handle->flags &= ~UV_HANDLE_READING; + if (handle->flags & UV_HANDLE_READING) { + handle->flags &= ~UV_HANDLE_READING; + DECREASE_ACTIVE_COUNT(loop, handle); + } handle->flags |= UV_HANDLE_EOF; + uv__set_error(loop, UV_EOF, ERROR_SUCCESS); buf.base = 0; buf.len = 0; handle->read_cb((uv_stream_t*)handle, -1, handle->read_buffer); goto done; @@ -953,11 +995,13 @@ break; } } else { /* Connection closed */ handle->flags &= ~UV_HANDLE_READING; + DECREASE_ACTIVE_COUNT(loop, handle); handle->flags |= UV_HANDLE_EOF; + uv__set_error(loop, UV_EOF, ERROR_SUCCESS); handle->read_cb((uv_stream_t*)handle, -1, buf); break; } } else { @@ -965,20 +1009,22 @@ if (err == WSAEWOULDBLOCK) { /* Read buffer was completely empty, report a 0-byte read. */ uv__set_sys_error(loop, WSAEWOULDBLOCK); handle->read_cb((uv_stream_t*)handle, 0, buf); } else { + /* Ouch! serious error. */ + handle->flags &= ~UV_HANDLE_READING; + DECREASE_ACTIVE_COUNT(loop, handle); + if (err == WSAECONNABORTED) { - /* - * Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with Unix. - */ + /* Turn WSAECONNABORTED into UV_ECONNRESET to be consistent with */ + /* Unix. */ uv__set_error(loop, UV_ECONNRESET, err); } else { - /* Ouch! serious error. */ uv__set_sys_error(loop, err); } - handle->flags &= ~UV_HANDLE_READING; + handle->read_cb((uv_stream_t*)handle, -1, buf); } break; } } @@ -1000,18 +1046,18 @@ assert(handle->type == UV_TCP); assert(handle->write_queue_size >= req->queued_bytes); handle->write_queue_size -= req->queued_bytes; + UNREGISTER_HANDLE_REQ(loop, handle, req); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { if (req->wait_handle != INVALID_HANDLE_VALUE) { UnregisterWait(req->wait_handle); - req->wait_handle = INVALID_HANDLE_VALUE; } if (req->event_handle) { CloseHandle(req->event_handle); - req->event_handle = NULL; } } if (req->cb) { uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); @@ -1023,11 +1069,10 @@ handle->write_reqs_pending == 0) { uv_want_endgame(loop, (uv_handle_t*)handle); } DECREASE_PENDING_REQ_COUNT(handle); - uv_unref(loop); } void uv_process_tcp_accept_req(uv_loop_t* loop, uv_tcp_t* handle, uv_req_t* raw_req) { @@ -1040,10 +1085,11 @@ /* accepting connections and report this error to the connection */ /* callback. */ if (req->accept_socket == INVALID_SOCKET) { if (handle->flags & UV_HANDLE_LISTENING) { handle->flags &= ~UV_HANDLE_LISTENING; + DECREASE_ACTIVE_COUNT(loop, handle); if (handle->connection_cb) { uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); handle->connection_cb((uv_stream_t*)handle, -1); } } @@ -1077,32 +1123,31 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, uv_connect_t* req) { assert(handle->type == UV_TCP); - if (req->cb) { - if (REQ_SUCCESS(req)) { - if (setsockopt(handle->socket, - SOL_SOCKET, - SO_UPDATE_CONNECT_CONTEXT, - NULL, - 0) == 0) { - uv_connection_init((uv_stream_t*)handle); - loop->active_tcp_streams++; - ((uv_connect_cb)req->cb)(req, 0); - } else { - uv__set_sys_error(loop, WSAGetLastError()); - ((uv_connect_cb)req->cb)(req, -1); - } + UNREGISTER_HANDLE_REQ(loop, handle, req); + + if (REQ_SUCCESS(req)) { + if (setsockopt(handle->socket, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + NULL, + 0) == 0) { + uv_connection_init((uv_stream_t*)handle); + loop->active_tcp_streams++; + ((uv_connect_cb)req->cb)(req, 0); } else { - uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); + uv__set_sys_error(loop, WSAGetLastError()); ((uv_connect_cb)req->cb)(req, -1); } + } else { + uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); + ((uv_connect_cb)req->cb)(req, -1); } DECREASE_PENDING_REQ_COUNT(handle); - uv_unref(loop); } int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info, int tcp_connection) { @@ -1116,25 +1161,32 @@ if (socket == INVALID_SOCKET) { uv__set_sys_error(tcp->loop, WSAGetLastError()); return -1; } - tcp->flags |= UV_HANDLE_BOUND; - tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET; + if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0)) { + uv__set_sys_error(tcp->loop, GetLastError()); + closesocket(socket); + return -1; + } + if (uv_tcp_set_socket(tcp->loop, tcp, socket, 1) != 0) { + closesocket(socket); + return -1; + } + if (tcp_connection) { uv_connection_init((uv_stream_t*)tcp); } + tcp->flags |= UV_HANDLE_BOUND; + tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET; + if (socket_protocol_info->iAddressFamily == AF_INET6) { tcp->flags |= UV_HANDLE_IPV6; } - if (uv_tcp_set_socket(tcp->loop, tcp, socket, 1) != 0) { - return -1; - } - tcp->loop->active_tcp_streams++; return 0; } @@ -1170,15 +1222,14 @@ return 0; } - int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, LPWSAPROTOCOL_INFOW protocol_info) { if (!(handle->flags & UV_HANDLE_CONNECTION)) { - /* + /* * We're about to share the socket with another process. Because * this is a listening socket, we assume that the other process will * be accepting connections on it. So, before sharing the socket * with another process, we call listen here in the parent process. */ @@ -1238,44 +1289,111 @@ return 0; } -void uv_tcp_close(uv_tcp_t* tcp) { +static int uv_tcp_try_cancel_io(uv_tcp_t* tcp) { + SOCKET socket = tcp->socket; int non_ifs_lsp; + + /* Check if we have any non-IFS LSPs stacked on top of TCP */ + non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 : + uv_tcp_non_ifs_lsp_ipv4; + + /* If there are non-ifs LSPs then try to obtain a base handle for the */ + /* socket. This will always fail on Windows XP/3k. */ + if (non_ifs_lsp) { + DWORD bytes; + if (WSAIoctl(socket, + SIO_BASE_HANDLE, + NULL, + 0, + &socket, + sizeof socket, + &bytes, + NULL, + NULL) != 0) { + /* Failed. We can't do CancelIo. */ + return -1; + } + } + + assert(socket != 0 && socket != INVALID_SOCKET); + + if (!CancelIo((HANDLE) socket)) { + return -1; + } + + /* It worked. */ + return 0; +} + + +void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) { int close_socket = 1; - /* - * In order for winsock to do a graceful close there must not be - * any pending reads. - */ if (tcp->flags & UV_HANDLE_READ_PENDING) { - /* Just do shutdown on non-shared sockets, which ensures graceful close. */ + /* In order for winsock to do a graceful close there must not be any */ + /* any pending reads, or the socket must be shut down for writing */ if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) { + /* Just do shutdown on non-shared sockets, which ensures graceful close. */ shutdown(tcp->socket, SD_SEND); tcp->flags |= UV_HANDLE_SHUT; + + } else if (uv_tcp_try_cancel_io(tcp) == 0) { + /* In case of a shared socket, we try to cancel all outstanding I/O, */ + /* If that works, don't close the socket yet - wait for the read req to */ + /* return and close the socket in uv_tcp_endgame. */ + close_socket = 0; + } else { - /* Check if we have any non-IFS LSPs stacked on top of TCP */ - non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 : - uv_tcp_non_ifs_lsp_ipv4; + /* When cancelling isn't possible - which could happen when an LSP is */ + /* present on an old Windows version, we will have to close the socket */ + /* with a read pending. That is not nice because trailing sent bytes */ + /* may not make it to the other side. */ + } - if (!non_ifs_lsp) { - /* - * Shared socket with no non-IFS LSPs, request to cancel pending I/O. - * The socket will be closed inside endgame. - */ - CancelIo((HANDLE)tcp->socket); - close_socket = 0; + } else if ((tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET) && + tcp->accept_reqs != NULL) { + /* Under normal circumstances closesocket() will ensure that all pending */ + /* accept reqs are canceled. However, when the socket is shared the */ + /* presence of another reference to the socket in another process will */ + /* keep the accept reqs going, so we have to ensure that these are */ + /* canceled. */ + if (uv_tcp_try_cancel_io(tcp) != 0) { + /* When cancellation is not possible, there is another option: we can */ + /* close the incoming sockets, which will also cancel the accept */ + /* operations. However this is not cool because we might inadvertedly */ + /* close a socket that just accepted a new connection, which will */ + /* cause the connection to be aborted. */ + unsigned int i; + for (i = 0; i < uv_simultaneous_server_accepts; i++) { + uv_tcp_accept_t* req = &tcp->accept_reqs[i]; + if (req->accept_socket != INVALID_SOCKET && + !HasOverlappedIoCompleted(&req->overlapped)) { + closesocket(req->accept_socket); + req->accept_socket = INVALID_SOCKET; + } } } } - tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); + if (tcp->flags & UV_HANDLE_READING) { + tcp->flags &= ~UV_HANDLE_READING; + DECREASE_ACTIVE_COUNT(loop, tcp); + } + if (tcp->flags & UV_HANDLE_LISTENING) { + tcp->flags &= ~UV_HANDLE_LISTENING; + DECREASE_ACTIVE_COUNT(loop, tcp); + } + if (close_socket) { closesocket(tcp->socket); tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED; } + + uv__handle_start(tcp); if (tcp->reqs_pending == 0) { uv_want_endgame(tcp->loop, (uv_handle_t*)tcp); } }