ext/libuv/src/unix/stream.c in libuv-1.0.0 vs ext/libuv/src/unix/stream.c in libuv-1.0.2

- old
+ new

@@ -58,25 +58,14 @@ static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); static void uv__read(uv_stream_t* stream); static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); +static void uv__write_callbacks(uv_stream_t* stream); static size_t uv__write_req_size(uv_write_t* req); -static size_t uv_count_bufs(const uv_buf_t bufs[], unsigned int nbufs) { - unsigned int i; - size_t bytes; - - bytes = 0; - for (i = 0; i < nbufs; i++) - bytes += bufs[i].len; - - return bytes; -} - - void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream, uv_handle_type type) { int err; @@ -388,48 +377,29 @@ while (!QUEUE_EMPTY(&stream->write_queue)) { q = QUEUE_HEAD(&stream->write_queue); QUEUE_REMOVE(q); req = QUEUE_DATA(q, uv_write_t, queue); - uv__req_unregister(stream->loop, req); + req->error = -ECANCELED; - if (req->bufs != req->bufsml) - free(req->bufs); - req->bufs = NULL; - - if (req->cb != NULL) - req->cb(req, -ECANCELED); + QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); } - while (!QUEUE_EMPTY(&stream->write_completed_queue)) { - q = QUEUE_HEAD(&stream->write_completed_queue); - QUEUE_REMOVE(q); + uv__write_callbacks(stream); - req = QUEUE_DATA(q, uv_write_t, queue); - uv__req_unregister(stream->loop, req); - - if (req->bufs != NULL) { - stream->write_queue_size -= uv__write_req_size(req); - if (req->bufs != req->bufsml) - free(req->bufs); - req->bufs = NULL; - } - - if (req->cb) - req->cb(req, req->error); - } - if (stream->shutdown_req) { /* The ECANCELED error code is a lie, the shutdown(2) syscall is a * fait accompli at this point. Maybe we should revisit this in v0.11. * A possible reason for leaving it unchanged is that it informs the * callee that the handle has been destroyed. */ uv__req_unregister(stream->loop, stream->shutdown_req); stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED); stream->shutdown_req = NULL; } + + assert(stream->write_queue_size == 0); } /* Implements a best effort approach to mitigating accept() EMFILE errors. * We have a spare file descriptor stashed away that we close to get below @@ -658,12 +628,12 @@ static size_t uv__write_req_size(uv_write_t* req) { size_t size; assert(req->bufs != NULL); - size = uv_count_bufs(req->bufs + req->write_index, - req->nbufs - req->write_index); + size = uv__count_bufs(req->bufs + req->write_index, + req->nbufs - req->write_index); assert(req->handle->write_queue_size >= size); return size; } @@ -901,14 +871,10 @@ if (req->cb) req->cb(req, req->error); } assert(QUEUE_EMPTY(&stream->write_completed_queue)); - - /* Write queue drained. */ - if (QUEUE_EMPTY(&stream->write_queue)) - uv__drain(stream); } uv_handle_type uv__handle_type(int fd) { struct sockaddr_storage ss; @@ -1221,10 +1187,14 @@ return; /* read_cb closed stream. */ if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) { uv__write(stream); uv__write_callbacks(stream); + + /* Write queue drained. */ + if (QUEUE_EMPTY(&stream->write_queue)) + uv__drain(stream); } } /** @@ -1325,11 +1295,11 @@ return -ENOMEM; memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); req->nbufs = nbufs; req->write_index = 0; - stream->write_queue_size += uv_count_bufs(bufs, nbufs); + stream->write_queue_size += uv__count_bufs(bufs, nbufs); /* Append the request to write_queue. */ QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); /* If the queue was empty when this function began, we should attempt to @@ -1384,20 +1354,20 @@ size_t req_size; uv_write_t req; /* Connecting or already writing some data */ if (stream->connect_req != NULL || stream->write_queue_size != 0) - return 0; + return -EAGAIN; has_pollout = uv__io_active(&stream->io_watcher, UV__POLLOUT); r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb); if (r != 0) return r; /* Remove not written bytes from write queue size */ - written = uv_count_bufs(bufs, nbufs); + written = uv__count_bufs(bufs, nbufs); if (req.bufs != NULL) req_size = uv__write_req_size(&req); else req_size = 0; written -= req_size; @@ -1414,10 +1384,13 @@ if (!has_pollout) { uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); uv__stream_osx_interrupt_select(stream); } - return (int) written; + if (written == 0) + return -EAGAIN; + else + return written; } int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,