ext/libuv/src/unix/stream.c in libuv-1.0.3 vs ext/libuv/src/unix/stream.c in libuv-1.1.0

- old
+ new

@@ -359,33 +359,36 @@ return 0; } -void uv__stream_destroy(uv_stream_t* stream) { +void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { uv_write_t* req; QUEUE* q; + while (!QUEUE_EMPTY(&stream->write_queue)) { + q = QUEUE_HEAD(&stream->write_queue); + QUEUE_REMOVE(q); + req = QUEUE_DATA(q, uv_write_t, queue); + req->error = error; + + QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); + } +} + + +void uv__stream_destroy(uv_stream_t* stream) { assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT)); assert(stream->flags & UV_CLOSED); if (stream->connect_req) { uv__req_unregister(stream->loop, stream->connect_req); stream->connect_req->cb(stream->connect_req, -ECANCELED); stream->connect_req = NULL; } - while (!QUEUE_EMPTY(&stream->write_queue)) { - q = QUEUE_HEAD(&stream->write_queue); - QUEUE_REMOVE(q); - - req = QUEUE_DATA(q, uv_write_t, queue); - req->error = -ECANCELED; - - QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); - } - + uv__stream_flush_write_queue(stream, -ECANCELED); uv__write_callbacks(stream); if (stream->shutdown_req) { /* The ECANCELED error code is a lie, the shutdown(2) syscall is a * fait accompli at this point. Maybe we should revisit this in v0.11. @@ -535,11 +538,11 @@ goto done; } break; default: - assert(0); + return -EINVAL; } done: /* Process queued fds */ if (server->queued_fds != NULL) { @@ -571,22 +574,21 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int err; - err = -EINVAL; switch (stream->type) { case UV_TCP: err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); break; case UV_NAMED_PIPE: err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); break; default: - assert(0); + err = -EINVAL; } if (err == 0) uv__handle_start(stream); @@ -1161,11 +1163,11 @@ } assert(uv__stream_fd(stream) >= 0); /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */ - if (events & (UV__POLLIN | UV__POLLERR)) + if (events & (UV__POLLIN | UV__POLLERR | UV__POLLHUP)) uv__read(stream); if (uv__stream_fd(stream) == -1) return; /* read_cb closed stream. */ @@ -1231,13 +1233,24 @@ if (error == -EINPROGRESS) return; stream->connect_req = NULL; uv__req_unregister(stream->loop, req); - uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); + if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { + uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); + } + if (req->cb) req->cb(req, error); + + if (uv__stream_fd(stream) == -1) + return; + + if (error < 0) { + uv__stream_flush_write_queue(stream, -ECANCELED); + uv__write_callbacks(stream); + } } int uv_write2(uv_write_t* req, uv_stream_t* stream,