ext/noderb_extension/libuv/src/unix/stream.c in noderb-0.0.10 vs ext/noderb_extension/libuv/src/unix/stream.c in noderb-0.0.11
- old
+ new
@@ -127,12 +127,12 @@
q = ngx_queue_head(&stream->write_completed_queue);
ngx_queue_remove(q);
req = ngx_queue_data(q, uv_write_t, queue);
if (req->cb) {
- uv_err_new_artificial(req->handle->loop, UV_OK);
- req->cb(req, 0);
+ uv_err_new_artificial(stream->loop, req->error);
+ req->cb(req, req->error ? -1 : 0);
}
}
}
@@ -285,10 +285,21 @@
}
}
}
+static size_t uv__write_req_size(uv_write_t* req) {
+ size_t size;
+
+ size = uv__buf_count(req->bufs + req->write_index,
+ req->bufcnt - req->write_index);
+ assert(req->handle->write_queue_size >= size);
+
+ return size;
+}
+
+
static void uv__write_req_finish(uv_write_t* req) {
uv_stream_t* stream = req->handle;
/* Pop the req off tcp->write_queue. */
ngx_queue_remove(&req->queue);
@@ -349,10 +360,11 @@
if (n < 0) {
if (errno != EAGAIN) {
/* Error */
req->error = errno;
+ stream->write_queue_size -= uv__write_req_size(req);
uv__write_req_finish(req);
return;
}
} else {
/* Successful write */
@@ -515,12 +527,12 @@
void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
uv_stream_t* stream = watcher->data;
- assert(stream->type == UV_TCP ||
- stream->type == UV_NAMED_PIPE);
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
+ stream->type == UV_TTY);
assert(watcher == &stream->read_watcher ||
watcher == &stream->write_watcher);
assert(!(stream->flags & UV_CLOSING));
if (stream->connect_req) {
@@ -665,12 +677,13 @@
*/
int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
int empty_queue;
- assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE)
- && "uv_write (unix) does not yet support other types of streams");
+ assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
+ stream->type == UV_TTY) &&
+ "uv_write (unix) does not yet support other types of streams");
if (stream->fd < 0) {
uv_err_new(stream->loop, EBADF);
return -1;
}
@@ -723,10 +736,11 @@
return 0;
}
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
- assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
+ stream->type == UV_TTY);
if (stream->flags & UV_CLOSING) {
uv_err_new(stream->loop, EINVAL);
return -1;
}