ext/libuv/src/unix/stream.c in libuv-1.0.0 vs ext/libuv/src/unix/stream.c in libuv-1.0.2
- old
+ new
@@ -58,25 +58,14 @@
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
+static void uv__write_callbacks(uv_stream_t* stream);
static size_t uv__write_req_size(uv_write_t* req);
-static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) {
- unsigned int i;
- size_t bytes;
-
- bytes = 0;
- for (i = 0; i < nbufs; i++)
- bytes += bufs[i].len;
-
- return bytes;
-}
-
-
void uv__stream_init(uv_loop_t* loop,
uv_stream_t* stream,
uv_handle_type type) {
int err;
@@ -388,48 +377,29 @@
while (!QUEUE_EMPTY(&stream->write_queue)) {
q = QUEUE_HEAD(&stream->write_queue);
QUEUE_REMOVE(q);
req = QUEUE_DATA(q, uv_write_t, queue);
- uv__req_unregister(stream->loop, req);
+ req->error = -ECANCELED;
- if (req->bufs != req->bufsml)
- free(req->bufs);
- req->bufs = NULL;
-
- if (req->cb != NULL)
- req->cb(req, -ECANCELED);
+ QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
}
- while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
- q = QUEUE_HEAD(&stream->write_completed_queue);
- QUEUE_REMOVE(q);
+ uv__write_callbacks(stream);
- req = QUEUE_DATA(q, uv_write_t, queue);
- uv__req_unregister(stream->loop, req);
-
- if (req->bufs != NULL) {
- stream->write_queue_size -= uv__write_req_size(req);
- if (req->bufs != req->bufsml)
- free(req->bufs);
- req->bufs = NULL;
- }
-
- if (req->cb)
- req->cb(req, req->error);
- }
-
if (stream->shutdown_req) {
/* The ECANCELED error code is a lie, the shutdown(2) syscall is a
* fait accompli at this point. Maybe we should revisit this in v0.11.
* A possible reason for leaving it unchanged is that it informs the
* callee that the handle has been destroyed.
*/
uv__req_unregister(stream->loop, stream->shutdown_req);
stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
stream->shutdown_req = NULL;
}
+
+ assert(stream->write_queue_size == 0);
}
/* Implements a best effort approach to mitigating accept() EMFILE errors.
* We have a spare file descriptor stashed away that we close to get below
@@ -658,12 +628,12 @@
static size_t uv__write_req_size(uv_write_t* req) {
size_t size;
assert(req->bufs != NULL);
- size = uv_count_bufs(req->bufs + req->write_index,
- req->nbufs - req->write_index);
+ size = uv__count_bufs(req->bufs + req->write_index,
+ req->nbufs - req->write_index);
assert(req->handle->write_queue_size >= size);
return size;
}
@@ -901,14 +871,10 @@
if (req->cb)
req->cb(req, req->error);
}
assert(QUEUE_EMPTY(&stream->write_completed_queue));
-
- /* Write queue drained. */
- if (QUEUE_EMPTY(&stream->write_queue))
- uv__drain(stream);
}
uv_handle_type uv__handle_type(int fd) {
struct sockaddr_storage ss;
@@ -1221,10 +1187,14 @@
return; /* read_cb closed stream. */
if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) {
uv__write(stream);
uv__write_callbacks(stream);
+
+ /* Write queue drained. */
+ if (QUEUE_EMPTY(&stream->write_queue))
+ uv__drain(stream);
}
}
/**
@@ -1325,11 +1295,11 @@
return -ENOMEM;
memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0]));
req->nbufs = nbufs;
req->write_index = 0;
- stream->write_queue_size += uv_count_bufs(bufs, nbufs);
+ stream->write_queue_size += uv__count_bufs(bufs, nbufs);
/* Append the request to write_queue. */
QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
/* If the queue was empty when this function began, we should attempt to
@@ -1384,20 +1354,20 @@
size_t req_size;
uv_write_t req;
/* Connecting or already writing some data */
if (stream->connect_req != NULL || stream->write_queue_size != 0)
- return 0;
+ return -EAGAIN;
has_pollout = uv__io_active(&stream->io_watcher, UV__POLLOUT);
r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
if (r != 0)
return r;
/* Remove not written bytes from write queue size */
- written = uv_count_bufs(bufs, nbufs);
+ written = uv__count_bufs(bufs, nbufs);
if (req.bufs != NULL)
req_size = uv__write_req_size(&req);
else
req_size = 0;
written -= req_size;
@@ -1414,10 +1384,13 @@
if (!has_pollout) {
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
- return (int) written;
+ if (written == 0)
+ return -EAGAIN;
+ else
+ return written;
}
int uv_read_start(uv_stream_t* stream,
uv_alloc_cb alloc_cb,