ext/libuv/src/unix/stream.c in libuv-1.0.3 vs ext/libuv/src/unix/stream.c in libuv-1.1.0
- old
+ new
@@ -359,33 +359,36 @@
return 0;
}
-void uv__stream_destroy(uv_stream_t* stream) {
+void uv__stream_flush_write_queue(uv_stream_t* stream, int error) {
uv_write_t* req;
QUEUE* q;
+ while (!QUEUE_EMPTY(&stream->write_queue)) {
+ q = QUEUE_HEAD(&stream->write_queue);
+ QUEUE_REMOVE(q);
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ req->error = error;
+
+ QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
+ }
+}
+
+
+void uv__stream_destroy(uv_stream_t* stream) {
assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT));
assert(stream->flags & UV_CLOSED);
if (stream->connect_req) {
uv__req_unregister(stream->loop, stream->connect_req);
stream->connect_req->cb(stream->connect_req, -ECANCELED);
stream->connect_req = NULL;
}
- while (!QUEUE_EMPTY(&stream->write_queue)) {
- q = QUEUE_HEAD(&stream->write_queue);
- QUEUE_REMOVE(q);
-
- req = QUEUE_DATA(q, uv_write_t, queue);
- req->error = -ECANCELED;
-
- QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
- }
-
+ uv__stream_flush_write_queue(stream, -ECANCELED);
uv__write_callbacks(stream);
if (stream->shutdown_req) {
/* The ECANCELED error code is a lie, the shutdown(2) syscall is a
* fait accompli at this point. Maybe we should revisit this in v0.11.
@@ -535,11 +538,11 @@
goto done;
}
break;
default:
- assert(0);
+ return -EINVAL;
}
done:
/* Process queued fds */
if (server->queued_fds != NULL) {
@@ -571,22 +574,21 @@
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
- err = -EINVAL;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
- assert(0);
+ err = -EINVAL;
}
if (err == 0)
uv__handle_start(stream);
@@ -1161,11 +1163,11 @@
}
assert(uv__stream_fd(stream) >= 0);
/* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
- if (events & (UV__POLLIN | UV__POLLERR))
+ if (events & (UV__POLLIN | UV__POLLERR | UV__POLLHUP))
uv__read(stream);
if (uv__stream_fd(stream) == -1)
return; /* read_cb closed stream. */
@@ -1231,13 +1233,24 @@
if (error == -EINPROGRESS)
return;
stream->connect_req = NULL;
uv__req_unregister(stream->loop, req);
- uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
+ if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) {
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
+ }
+
if (req->cb)
req->cb(req, error);
+
+ if (uv__stream_fd(stream) == -1)
+ return;
+
+ if (error < 0) {
+ uv__stream_flush_write_queue(stream, -ECANCELED);
+ uv__write_callbacks(stream);
+ }
}
int uv_write2(uv_write_t* req,
uv_stream_t* stream,