ext/libuv/src/unix/stream.c in libuv-0.10.0 vs ext/libuv/src/unix/stream.c in libuv-0.10.2
- old
+ new
@@ -1,1448 +1,1448 @@
-/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to
- * deal in the Software without restriction, including without limitation the
- * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- * sell copies of the Software, and to permit persons to whom the Software is
- * furnished to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in
- * all copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- * IN THE SOFTWARE.
- */
-
-#include "uv.h"
-#include "internal.h"
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <assert.h>
-#include <errno.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/uio.h>
-#include <sys/un.h>
-#include <unistd.h>
-#include <limits.h> /* IOV_MAX */
-
-#if defined(__APPLE__)
-# include <sys/event.h>
-# include <sys/time.h>
-# include <sys/select.h>
-
-/* Forward declaration */
-typedef struct uv__stream_select_s uv__stream_select_t;
-
-struct uv__stream_select_s {
- uv_stream_t* stream;
- uv_thread_t thread;
- uv_sem_t close_sem;
- uv_sem_t async_sem;
- uv_async_t async;
- int events;
- int fake_fd;
- int int_fd;
- int fd;
-};
-#endif /* defined(__APPLE__) */
-
-static void uv__stream_connect(uv_stream_t*);
-static void uv__write(uv_stream_t* stream);
-static void uv__read(uv_stream_t* stream);
-static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
-static size_t uv__write_req_size(uv_write_t* req);
-
-
-/* Used by the accept() EMFILE party trick. */
-static int uv__open_cloexec(const char* path, int flags) {
- int err;
- int fd;
-
-#if defined(__linux__)
- fd = open(path, flags | UV__O_CLOEXEC);
- if (fd != -1)
- return fd;
-
- if (errno != EINVAL)
- return -errno;
-
- /* O_CLOEXEC not supported. */
-#endif
-
- fd = open(path, flags);
- if (fd == -1)
- return -errno;
-
- err = uv__cloexec(fd, 1);
- if (err) {
- close(fd);
- return err;
- }
-
- return fd;
-}
-
-
-static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
- size_t total = 0;
- int i;
-
- for (i = 0; i < bufcnt; i++) {
- total += bufs[i].len;
- }
-
- return total;
-}
-
-
-void uv__stream_init(uv_loop_t* loop,
- uv_stream_t* stream,
- uv_handle_type type) {
- int err;
-
- uv__handle_init(loop, (uv_handle_t*)stream, type);
- stream->read_cb = NULL;
- stream->read2_cb = NULL;
- stream->alloc_cb = NULL;
- stream->close_cb = NULL;
- stream->connection_cb = NULL;
- stream->connect_req = NULL;
- stream->shutdown_req = NULL;
- stream->accepted_fd = -1;
- stream->delayed_error = 0;
- QUEUE_INIT(&stream->write_queue);
- QUEUE_INIT(&stream->write_completed_queue);
- stream->write_queue_size = 0;
-
- if (loop->emfile_fd == -1) {
- err = uv__open_cloexec("/", O_RDONLY);
- if (err >= 0)
- loop->emfile_fd = err;
- }
-
-#if defined(__APPLE__)
- stream->select = NULL;
-#endif /* defined(__APPLE_) */
-
- uv__io_init(&stream->io_watcher, uv__stream_io, -1);
-}
-
-
-#if defined(__APPLE__)
-static void uv__stream_osx_select(void* arg) {
- uv_stream_t* stream;
- uv__stream_select_t* s;
- char buf[1024];
- fd_set sread;
- fd_set swrite;
- int events;
- int fd;
- int r;
- int max_fd;
-
- stream = arg;
- s = stream->select;
- fd = s->fd;
-
- if (fd > s->int_fd)
- max_fd = fd;
- else
- max_fd = s->int_fd;
-
- while (1) {
- /* Terminate on semaphore */
- if (uv_sem_trywait(&s->close_sem) == 0)
- break;
-
- /* Watch fd using select(2) */
- FD_ZERO(&sread);
- FD_ZERO(&swrite);
-
- if (uv_is_readable(stream))
- FD_SET(fd, &sread);
- if (uv_is_writable(stream))
- FD_SET(fd, &swrite);
- FD_SET(s->int_fd, &sread);
-
- /* Wait indefinitely for fd events */
- r = select(max_fd + 1, &sread, &swrite, NULL, NULL);
- if (r == -1) {
- if (errno == EINTR)
- continue;
-
- /* XXX: Possible?! */
- abort();
- }
-
- /* Ignore timeouts */
- if (r == 0)
- continue;
-
- /* Empty socketpair's buffer in case of interruption */
- if (FD_ISSET(s->int_fd, &sread))
- while (1) {
- r = read(s->int_fd, buf, sizeof(buf));
-
- if (r == sizeof(buf))
- continue;
-
- if (r != -1)
- break;
-
- if (errno == EAGAIN || errno == EWOULDBLOCK)
- break;
-
- if (errno == EINTR)
- continue;
-
- abort();
- }
-
- /* Handle events */
- events = 0;
- if (FD_ISSET(fd, &sread))
- events |= UV__POLLIN;
- if (FD_ISSET(fd, &swrite))
- events |= UV__POLLOUT;
-
- assert(events != 0 || FD_ISSET(s->int_fd, &sread));
- if (events != 0) {
- ACCESS_ONCE(int, s->events) = events;
-
- uv_async_send(&s->async);
- uv_sem_wait(&s->async_sem);
-
- /* Should be processed at this stage */
- assert((s->events == 0) || (stream->flags & UV_CLOSING));
- }
- }
-}
-
-
-static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
- /* Notify select() thread about state change */
- uv__stream_select_t* s;
- int r;
-
- s = stream->select;
-
- /* Interrupt select() loop
- * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
- * emit read event on other side
- */
- do
- r = write(s->fake_fd, "x", 1);
- while (r == -1 && errno == EINTR);
-
- assert(r == 1);
-}
-
-
-static void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
- uv__stream_select_t* s;
- uv_stream_t* stream;
- int events;
-
- s = container_of(handle, uv__stream_select_t, async);
- stream = s->stream;
-
- /* Get and reset stream's events */
- events = s->events;
- ACCESS_ONCE(int, s->events) = 0;
- uv_sem_post(&s->async_sem);
-
- assert(events != 0);
- assert(events == (events & (UV__POLLIN | UV__POLLOUT)));
-
- /* Invoke callback on event-loop */
- if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
- uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
-
- if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
- uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
-}
-
-
-static void uv__stream_osx_cb_close(uv_handle_t* async) {
- uv__stream_select_t* s;
-
- s = container_of(async, uv__stream_select_t, async);
- free(s);
-}
-
-
-int uv__stream_try_select(uv_stream_t* stream, int* fd) {
- /*
- * kqueue doesn't work with some files from /dev mount on osx.
- * select(2) in separate thread for those fds
- */
-
- struct kevent filter[1];
- struct kevent events[1];
- struct timespec timeout;
- uv__stream_select_t* s;
- int fds[2];
- int err;
- int ret;
- int kq;
-
- kq = kqueue();
- if (kq == -1) {
- fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno);
- return -errno;
- }
-
- EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
-
- /* Use small timeout, because we only want to capture EINVALs */
- timeout.tv_sec = 0;
- timeout.tv_nsec = 1;
-
- ret = kevent(kq, filter, 1, events, 1, &timeout);
- SAVE_ERRNO(close(kq));
-
- if (ret == -1)
- return -errno;
-
- if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
- return 0;
-
- /* At this point we definitely know that this fd won't work with kqueue */
- s = malloc(sizeof(*s));
- if (s == NULL)
- return -ENOMEM;
-
- s->events = 0;
- s->fd = *fd;
-
- err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
- if (err) {
- free(s);
- return err;
- }
-
- s->async.flags |= UV__HANDLE_INTERNAL;
- uv__handle_unref(&s->async);
-
- if (uv_sem_init(&s->close_sem, 0))
- goto fatal1;
-
- if (uv_sem_init(&s->async_sem, 0))
- goto fatal2;
-
- /* Create fds for io watcher and to interrupt the select() loop. */
- if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
- goto fatal3;
-
- s->fake_fd = fds[0];
- s->int_fd = fds[1];
-
- if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
- goto fatal4;
-
- s->stream = stream;
- stream->select = s;
- *fd = s->fake_fd;
-
- return 0;
-
-fatal4:
- close(s->fake_fd);
- close(s->int_fd);
- s->fake_fd = -1;
- s->int_fd = -1;
-fatal3:
- uv_sem_destroy(&s->async_sem);
-fatal2:
- uv_sem_destroy(&s->close_sem);
-fatal1:
- uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
- return -errno;
-}
-#endif /* defined(__APPLE__) */
-
-
-int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
- assert(fd >= 0);
- stream->flags |= flags;
-
- if (stream->type == UV_TCP) {
- if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
- return -errno;
-
- /* TODO Use delay the user passed in. */
- if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
- return -errno;
- }
-
- stream->io_watcher.fd = fd;
-
- return 0;
-}
-
-
-void uv__stream_destroy(uv_stream_t* stream) {
- uv_write_t* req;
- QUEUE* q;
-
- assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT));
- assert(stream->flags & UV_CLOSED);
-
- if (stream->connect_req) {
- uv__req_unregister(stream->loop, stream->connect_req);
- stream->connect_req->cb(stream->connect_req, -ECANCELED);
- stream->connect_req = NULL;
- }
-
- while (!QUEUE_EMPTY(&stream->write_queue)) {
- q = QUEUE_HEAD(&stream->write_queue);
- QUEUE_REMOVE(q);
-
- req = QUEUE_DATA(q, uv_write_t, queue);
- uv__req_unregister(stream->loop, req);
-
- if (req->bufs != req->bufsml)
- free(req->bufs);
- req->bufs = NULL;
-
- if (req->cb != NULL)
- req->cb(req, -ECANCELED);
- }
-
- while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
- q = QUEUE_HEAD(&stream->write_completed_queue);
- QUEUE_REMOVE(q);
-
- req = QUEUE_DATA(q, uv_write_t, queue);
- uv__req_unregister(stream->loop, req);
-
- if (req->bufs != NULL) {
- stream->write_queue_size -= uv__write_req_size(req);
- if (req->bufs != req->bufsml)
- free(req->bufs);
- req->bufs = NULL;
- }
-
- if (req->cb)
- req->cb(req, req->error);
- }
-
- if (stream->shutdown_req) {
- /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
- * fait accompli at this point. Maybe we should revisit this in v0.11.
- * A possible reason for leaving it unchanged is that it informs the
- * callee that the handle has been destroyed.
- */
- uv__req_unregister(stream->loop, stream->shutdown_req);
- stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
- stream->shutdown_req = NULL;
- }
-}
-
-
-/* Implements a best effort approach to mitigating accept() EMFILE errors.
- * We have a spare file descriptor stashed away that we close to get below
- * the EMFILE limit. Next, we accept all pending connections and close them
- * immediately to signal the clients that we're overloaded - and we are, but
- * we still keep on trucking.
- *
- * There is one caveat: it's not reliable in a multi-threaded environment.
- * The file descriptor limit is per process. Our party trick fails if another
- * thread opens a file or creates a socket in the time window between us
- * calling close() and accept().
- */
-static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
- int fd;
-
- if (loop->emfile_fd == -1)
- return -EMFILE;
-
- close(loop->emfile_fd);
-
- for (;;) {
- fd = uv__accept(accept_fd);
-
- if (fd != -1) {
- close(fd);
- continue;
- }
-
- if (errno == EINTR)
- continue;
-
- SAVE_ERRNO(loop->emfile_fd = uv__open_cloexec("/", O_RDONLY));
- return -errno;
- }
-}
-
-
-#if defined(UV_HAVE_KQUEUE)
-# define UV_DEC_BACKLOG(w) w->rcount--;
-#else
-# define UV_DEC_BACKLOG(w) /* no-op */
-#endif /* defined(UV_HAVE_KQUEUE) */
-
-
-void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
- uv_stream_t* stream;
- int err;
-
- stream = container_of(w, uv_stream_t, io_watcher);
- assert(events == UV__POLLIN);
- assert(stream->accepted_fd == -1);
- assert(!(stream->flags & UV_CLOSING));
-
- uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
-
- /* connection_cb can close the server socket while we're
- * in the loop so check it on each iteration.
- */
- while (uv__stream_fd(stream) != -1) {
- assert(stream->accepted_fd == -1);
-
-#if defined(UV_HAVE_KQUEUE)
- if (w->rcount <= 0)
- return;
-#endif /* defined(UV_HAVE_KQUEUE) */
-
- err = uv__accept(uv__stream_fd(stream));
- if (err < 0) {
- if (err == -EAGAIN || err == -EWOULDBLOCK)
- return; /* Not an error. */
-
- if (err == -ECONNABORTED)
- continue; /* Ignore. Nothing we can do about that. */
-
- if (err == -EMFILE || err == -ENFILE) {
- err = uv__emfile_trick(loop, uv__stream_fd(stream));
- if (err == -EAGAIN || err == -EWOULDBLOCK)
- break;
- }
-
- stream->connection_cb(stream, err);
- continue;
- }
-
- UV_DEC_BACKLOG(w)
- stream->accepted_fd = err;
- stream->connection_cb(stream, 0);
-
- if (stream->accepted_fd != -1) {
- /* The user hasn't yet accepted called uv_accept() */
- uv__io_stop(loop, &stream->io_watcher, UV__POLLIN);
- return;
- }
-
- if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) {
- /* Give other processes a chance to accept connections. */
- struct timespec timeout = { 0, 1 };
- nanosleep(&timeout, NULL);
- }
- }
-}
-
-
-#undef UV_DEC_BACKLOG
-
-
-int uv_accept(uv_stream_t* server, uv_stream_t* client) {
- int err;
-
- /* TODO document this */
- assert(server->loop == client->loop);
-
- if (server->accepted_fd == -1)
- return -EAGAIN;
-
- switch (client->type) {
- case UV_NAMED_PIPE:
- case UV_TCP:
- err = uv__stream_open(client,
- server->accepted_fd,
- UV_STREAM_READABLE | UV_STREAM_WRITABLE);
- if (err) {
- /* TODO handle error */
- close(server->accepted_fd);
- server->accepted_fd = -1;
- return err;
- }
- break;
-
- case UV_UDP:
- err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
- if (err) {
- close(server->accepted_fd);
- server->accepted_fd = -1;
- return err;
- }
- break;
-
- default:
- assert(0);
- }
-
- uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
- server->accepted_fd = -1;
- return 0;
-}
-
-
-int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
- int err;
-
- err = -EINVAL;
- switch (stream->type) {
- case UV_TCP:
- err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
- break;
-
- case UV_NAMED_PIPE:
- err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
- break;
-
- default:
- assert(0);
- }
-
- if (err == 0)
- uv__handle_start(stream);
-
- return err;
-}
-
-
-static void uv__drain(uv_stream_t* stream) {
- uv_shutdown_t* req;
- int err;
-
- assert(QUEUE_EMPTY(&stream->write_queue));
- uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
-
- /* Shutdown? */
- if ((stream->flags & UV_STREAM_SHUTTING) &&
- !(stream->flags & UV_CLOSING) &&
- !(stream->flags & UV_STREAM_SHUT)) {
- assert(stream->shutdown_req);
-
- req = stream->shutdown_req;
- stream->shutdown_req = NULL;
- stream->flags &= ~UV_STREAM_SHUTTING;
- uv__req_unregister(stream->loop, req);
-
- err = 0;
- if (shutdown(uv__stream_fd(stream), SHUT_WR))
- err = -errno;
-
- if (err == 0)
- stream->flags |= UV_STREAM_SHUT;
-
- if (req->cb != NULL)
- req->cb(req, err);
- }
-}
-
-
-static size_t uv__write_req_size(uv_write_t* req) {
- size_t size;
-
- assert(req->bufs != NULL);
- size = uv__buf_count(req->bufs + req->write_index,
- req->bufcnt - req->write_index);
- assert(req->handle->write_queue_size >= size);
-
- return size;
-}
-
-
-static void uv__write_req_finish(uv_write_t* req) {
- uv_stream_t* stream = req->handle;
-
- /* Pop the req off tcp->write_queue. */
- QUEUE_REMOVE(&req->queue);
-
- /* Only free when there was no error. On error, we touch up write_queue_size
- * right before making the callback. The reason we don't do that right away
- * is that a write_queue_size > 0 is our only way to signal to the user that
- * he should stop writing - which he should if we got an error. Something to
- * revisit in future revisions of the libuv API.
- */
- if (req->error == 0) {
- if (req->bufs != req->bufsml)
- free(req->bufs);
- req->bufs = NULL;
- }
-
- /* Add it to the write_completed_queue where it will have its
- * callback called in the near future.
- */
- QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
- uv__io_feed(stream->loop, &stream->io_watcher);
-}
-
-
-static int uv__handle_fd(uv_handle_t* handle) {
- switch (handle->type) {
- case UV_NAMED_PIPE:
- case UV_TCP:
- return ((uv_stream_t*) handle)->io_watcher.fd;
-
- case UV_UDP:
- return ((uv_udp_t*) handle)->io_watcher.fd;
-
- default:
- return -1;
- }
-}
-
-static int uv__getiovmax() {
-#if defined(IOV_MAX)
- return IOV_MAX;
-#elif defined(_SC_IOV_MAX)
- static int iovmax = -1;
- if (iovmax == -1)
- iovmax = sysconf(_SC_IOV_MAX);
- return iovmax;
-#else
- return 1024;
-#endif
-}
-
-static void uv__write(uv_stream_t* stream) {
- struct iovec* iov;
- QUEUE* q;
- uv_write_t* req;
- int iovmax;
- int iovcnt;
- ssize_t n;
-
-start:
-
- assert(uv__stream_fd(stream) >= 0);
-
- if (QUEUE_EMPTY(&stream->write_queue))
- return;
-
- q = QUEUE_HEAD(&stream->write_queue);
- req = QUEUE_DATA(q, uv_write_t, queue);
- assert(req->handle == stream);
-
- /*
- * Cast to iovec. We had to have our own uv_buf_t instead of iovec
- * because Windows's WSABUF is not an iovec.
- */
- assert(sizeof(uv_buf_t) == sizeof(struct iovec));
- iov = (struct iovec*) &(req->bufs[req->write_index]);
- iovcnt = req->bufcnt - req->write_index;
-
- iovmax = uv__getiovmax();
-
- /* Limit iov count to avoid EINVALs from writev() */
- if (iovcnt > iovmax)
- iovcnt = iovmax;
-
- /*
- * Now do the actual writev. Note that we've been updating the pointers
- * inside the iov each time we write. So there is no need to offset it.
- */
-
- if (req->send_handle) {
- struct msghdr msg;
- char scratch[64];
- struct cmsghdr *cmsg;
- int fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
-
- assert(fd_to_send >= 0);
-
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- msg.msg_iov = iov;
- msg.msg_iovlen = iovcnt;
- msg.msg_flags = 0;
-
- msg.msg_control = (void*) scratch;
- msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
-
- cmsg = CMSG_FIRSTHDR(&msg);
- cmsg->cmsg_level = SOL_SOCKET;
- cmsg->cmsg_type = SCM_RIGHTS;
- cmsg->cmsg_len = msg.msg_controllen;
-
- /* silence aliasing warning */
- {
- void* pv = CMSG_DATA(cmsg);
- int* pi = pv;
- *pi = fd_to_send;
- }
-
- do {
- n = sendmsg(uv__stream_fd(stream), &msg, 0);
- }
- while (n == -1 && errno == EINTR);
- } else {
- do {
- if (iovcnt == 1) {
- n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
- } else {
- n = writev(uv__stream_fd(stream), iov, iovcnt);
- }
- }
- while (n == -1 && errno == EINTR);
- }
-
- if (n < 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- /* Error */
- req->error = -errno;
- uv__write_req_finish(req);
- uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
- if (!uv__io_active(&stream->io_watcher, UV__POLLIN))
- uv__handle_stop(stream);
- return;
- } else if (stream->flags & UV_STREAM_BLOCKING) {
- /* If this is a blocking stream, try again. */
- goto start;
- }
- } else {
- /* Successful write */
-
- while (n >= 0) {
- uv_buf_t* buf = &(req->bufs[req->write_index]);
- size_t len = buf->len;
-
- assert(req->write_index < req->bufcnt);
-
- if ((size_t)n < len) {
- buf->base += n;
- buf->len -= n;
- stream->write_queue_size -= n;
- n = 0;
-
- /* There is more to write. */
- if (stream->flags & UV_STREAM_BLOCKING) {
- /*
- * If we're blocking then we should not be enabling the write
- * watcher - instead we need to try again.
- */
- goto start;
- } else {
- /* Break loop and ensure the watcher is pending. */
- break;
- }
-
- } else {
- /* Finished writing the buf at index req->write_index. */
- req->write_index++;
-
- assert((size_t)n >= len);
- n -= len;
-
- assert(stream->write_queue_size >= len);
- stream->write_queue_size -= len;
-
- if (req->write_index == req->bufcnt) {
- /* Then we're done! */
- assert(n == 0);
- uv__write_req_finish(req);
- /* TODO: start trying to write the next request. */
- return;
- }
- }
- }
- }
-
- /* Either we've counted n down to zero or we've got EAGAIN. */
- assert(n == 0 || n == -1);
-
- /* Only non-blocking streams should use the write_watcher. */
- assert(!(stream->flags & UV_STREAM_BLOCKING));
-
- /* We're not done. */
- uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
-}
-
-
-static void uv__write_callbacks(uv_stream_t* stream) {
- uv_write_t* req;
- QUEUE* q;
-
- while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
- /* Pop a req off write_completed_queue. */
- q = QUEUE_HEAD(&stream->write_completed_queue);
- req = QUEUE_DATA(q, uv_write_t, queue);
- QUEUE_REMOVE(q);
- uv__req_unregister(stream->loop, req);
-
- if (req->bufs != NULL) {
- stream->write_queue_size -= uv__write_req_size(req);
- if (req->bufs != req->bufsml)
- free(req->bufs);
- req->bufs = NULL;
- }
-
- /* NOTE: call callback AFTER freeing the request data. */
- if (req->cb)
- req->cb(req, req->error);
- }
-
- assert(QUEUE_EMPTY(&stream->write_completed_queue));
-
- /* Write queue drained. */
- if (QUEUE_EMPTY(&stream->write_queue))
- uv__drain(stream);
-}
-
-
-static uv_handle_type uv__handle_type(int fd) {
- struct sockaddr_storage ss;
- socklen_t len;
- int type;
-
- memset(&ss, 0, sizeof(ss));
- len = sizeof(ss);
-
- if (getsockname(fd, (struct sockaddr*)&ss, &len))
- return UV_UNKNOWN_HANDLE;
-
- len = sizeof type;
-
- if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
- return UV_UNKNOWN_HANDLE;
-
- if (type == SOCK_STREAM) {
- switch (ss.ss_family) {
- case AF_UNIX:
- return UV_NAMED_PIPE;
- case AF_INET:
- case AF_INET6:
- return UV_TCP;
- }
- }
-
- if (type == SOCK_DGRAM &&
- (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
- return UV_UDP;
-
- return UV_UNKNOWN_HANDLE;
-}
-
-
-static void uv__stream_read_cb(uv_stream_t* stream,
- int status,
- uv_buf_t buf,
- uv_handle_type type) {
- if (stream->read_cb != NULL)
- stream->read_cb(stream, status, buf);
- else
- stream->read2_cb((uv_pipe_t*) stream, status, buf, type);
-}
-
-
-static void uv__stream_eof(uv_stream_t* stream, uv_buf_t buf) {
- stream->flags |= UV_STREAM_READ_EOF;
- uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
- if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
- uv__handle_stop(stream);
- uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
-}
-
-
-static void uv__read(uv_stream_t* stream) {
- uv_buf_t buf;
- ssize_t nread;
- struct msghdr msg;
- struct cmsghdr* cmsg;
- char cmsg_space[64];
- int count;
-
- stream->flags &= ~UV_STREAM_READ_PARTIAL;
-
- /* Prevent loop starvation when the data comes in as fast as (or faster than)
- * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
- */
- count = 32;
-
- /* XXX: Maybe instead of having UV_STREAM_READING we just test if
- * tcp->read_cb is NULL or not?
- */
- while ((stream->read_cb || stream->read2_cb)
- && (stream->flags & UV_STREAM_READING)
- && (count-- > 0)) {
- assert(stream->alloc_cb != NULL);
-
- buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
- if (buf.len == 0) {
- /* User indicates it can't or won't handle the read. */
- uv__stream_read_cb(stream, UV_ENOBUFS, buf, UV_UNKNOWN_HANDLE);
- return;
- }
-
- assert(buf.base != NULL);
- assert(uv__stream_fd(stream) >= 0);
-
- if (stream->read_cb) {
- do {
- nread = read(uv__stream_fd(stream), buf.base, buf.len);
- }
- while (nread < 0 && errno == EINTR);
- } else {
- assert(stream->read2_cb);
- /* read2_cb uses recvmsg */
- msg.msg_flags = 0;
- msg.msg_iov = (struct iovec*) &buf;
- msg.msg_iovlen = 1;
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
- /* Set up to receive a descriptor even if one isn't in the message */
- msg.msg_controllen = 64;
- msg.msg_control = (void*) cmsg_space;
-
- do {
- nread = recvmsg(uv__stream_fd(stream), &msg, 0);
- }
- while (nread < 0 && errno == EINTR);
- }
-
- if (nread < 0) {
- /* Error */
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- /* Wait for the next one. */
- if (stream->flags & UV_STREAM_READING) {
- uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
- }
- uv__stream_read_cb(stream, 0, buf, UV_UNKNOWN_HANDLE);
- } else {
- /* Error. User should call uv_close(). */
- uv__stream_read_cb(stream, -errno, buf, UV_UNKNOWN_HANDLE);
- assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
- "stream->read_cb(status=-1) did not call uv_close()");
- }
- return;
- } else if (nread == 0) {
- uv__stream_eof(stream, buf);
- return;
- } else {
- /* Successful read */
- ssize_t buflen = buf.len;
-
- if (stream->read_cb) {
- stream->read_cb(stream, nread, buf);
- } else {
- assert(stream->read2_cb);
-
- /*
- * XXX: Some implementations can send multiple file descriptors in a
- * single message. We should be using CMSG_NXTHDR() to walk the
- * chain to get at them all. This would require changing the API to
- * hand these back up the caller, is a pain.
- */
-
- for (cmsg = CMSG_FIRSTHDR(&msg);
- msg.msg_controllen > 0 && cmsg != NULL;
- cmsg = CMSG_NXTHDR(&msg, cmsg)) {
-
- if (cmsg->cmsg_type == SCM_RIGHTS) {
- if (stream->accepted_fd != -1) {
- fprintf(stderr, "(libuv) ignoring extra FD received\n");
- }
-
- /* silence aliasing warning */
- {
- void* pv = CMSG_DATA(cmsg);
- int* pi = pv;
- stream->accepted_fd = *pi;
- }
-
- } else {
- fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
- cmsg->cmsg_type);
- }
- }
-
-
- if (stream->accepted_fd >= 0) {
- stream->read2_cb((uv_pipe_t*)stream, nread, buf,
- uv__handle_type(stream->accepted_fd));
- } else {
- stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE);
- }
- }
-
- /* Return if we didn't fill the buffer, there is no more data to read. */
- if (nread < buflen) {
- stream->flags |= UV_STREAM_READ_PARTIAL;
- return;
- }
- }
- }
-}
-
-
-int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
- assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
- "uv_shutdown (unix) only supports uv_handle_t right now");
- assert(uv__stream_fd(stream) >= 0);
-
- if (!(stream->flags & UV_STREAM_WRITABLE) ||
- stream->flags & UV_STREAM_SHUT ||
- stream->flags & UV_CLOSED ||
- stream->flags & UV_CLOSING) {
- return -ENOTCONN;
- }
-
- /* Initialize request */
- uv__req_init(stream->loop, req, UV_SHUTDOWN);
- req->handle = stream;
- req->cb = cb;
- stream->shutdown_req = req;
- stream->flags |= UV_STREAM_SHUTTING;
-
- uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
-
- return 0;
-}
-
-
-static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
- uv_stream_t* stream;
-
- stream = container_of(w, uv_stream_t, io_watcher);
-
- assert(stream->type == UV_TCP ||
- stream->type == UV_NAMED_PIPE ||
- stream->type == UV_TTY);
- assert(!(stream->flags & UV_CLOSING));
-
- if (stream->connect_req) {
- uv__stream_connect(stream);
- return;
- }
-
- assert(uv__stream_fd(stream) >= 0);
-
- /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
- if (events & (UV__POLLIN | UV__POLLERR))
- uv__read(stream);
-
- if (uv__stream_fd(stream) == -1)
- return; /* read_cb closed stream. */
-
- /* Short-circuit iff POLLHUP is set, the user is still interested in read
- * events and uv__read() reported a partial read but not EOF. If the EOF
- * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
- * have to do anything. If the partial read flag is not set, we can't
- * report the EOF yet because there is still data to read.
- */
- if ((events & UV__POLLHUP) &&
- (stream->flags & UV_STREAM_READING) &&
- (stream->flags & UV_STREAM_READ_PARTIAL) &&
- !(stream->flags & UV_STREAM_READ_EOF)) {
- uv_buf_t buf = { NULL, 0 };
- uv__stream_eof(stream, buf);
- }
-
- if (uv__stream_fd(stream) == -1)
- return; /* read_cb closed stream. */
-
- if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) {
- uv__write(stream);
- uv__write_callbacks(stream);
- }
-}
-
-
-/**
- * We get called here from directly following a call to connect(2).
- * In order to determine if we've errored out or succeeded must call
- * getsockopt.
- */
-static void uv__stream_connect(uv_stream_t* stream) {
- int error;
- uv_connect_t* req = stream->connect_req;
- socklen_t errorsize = sizeof(int);
-
- assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
- assert(req);
-
- if (stream->delayed_error) {
- /* To smooth over the differences between unixes errors that
- * were reported synchronously on the first connect can be delayed
- * until the next tick--which is now.
- */
- error = stream->delayed_error;
- stream->delayed_error = 0;
- } else {
- /* Normal situation: we need to get the socket error from the kernel. */
- assert(uv__stream_fd(stream) >= 0);
- getsockopt(uv__stream_fd(stream),
- SOL_SOCKET,
- SO_ERROR,
- &error,
- &errorsize);
- error = -error;
- }
-
- if (error == -EINPROGRESS)
- return;
-
- stream->connect_req = NULL;
- uv__req_unregister(stream->loop, req);
- uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
-
- if (req->cb)
- req->cb(req, error);
-}
-
-
-int uv_write2(uv_write_t* req,
- uv_stream_t* stream,
- uv_buf_t bufs[],
- int bufcnt,
- uv_stream_t* send_handle,
- uv_write_cb cb) {
- int empty_queue;
-
- assert(bufcnt > 0);
- assert((stream->type == UV_TCP ||
- stream->type == UV_NAMED_PIPE ||
- stream->type == UV_TTY) &&
- "uv_write (unix) does not yet support other types of streams");
-
- if (uv__stream_fd(stream) < 0)
- return -EBADF;
-
- if (send_handle) {
- if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
- return -EINVAL;
-
- /* XXX We abuse uv_write2() to send over UDP handles to child processes.
- * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
- * evaluates to a function that operates on a uv_stream_t with a couple of
- * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
- * which works but only by accident.
- */
- if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
- return -EBADF;
- }
-
- /* It's legal for write_queue_size > 0 even when the write_queue is empty;
- * it means there are error-state requests in the write_completed_queue that
- * will touch up write_queue_size later, see also uv__write_req_finish().
- * We chould check that write_queue is empty instead but that implies making
- * a write() syscall when we know that the handle is in error mode.
- */
- empty_queue = (stream->write_queue_size == 0);
-
- /* Initialize the req */
- uv__req_init(stream->loop, req, UV_WRITE);
- req->cb = cb;
- req->handle = stream;
- req->error = 0;
- req->send_handle = send_handle;
- QUEUE_INIT(&req->queue);
-
- if (bufcnt <= (int) ARRAY_SIZE(req->bufsml))
- req->bufs = req->bufsml;
- else
- req->bufs = malloc(sizeof(uv_buf_t) * bufcnt);
-
- memcpy(req->bufs, bufs, bufcnt * sizeof(uv_buf_t));
- req->bufcnt = bufcnt;
- req->write_index = 0;
- stream->write_queue_size += uv__buf_count(bufs, bufcnt);
-
- /* Append the request to write_queue. */
- QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
-
- /* If the queue was empty when this function began, we should attempt to
- * do the write immediately. Otherwise start the write_watcher and wait
- * for the fd to become writable.
- */
- if (stream->connect_req) {
- /* Still connecting, do nothing. */
- }
- else if (empty_queue) {
- uv__write(stream);
- }
- else {
- /*
- * blocking streams should never have anything in the queue.
- * if this assert fires then somehow the blocking stream isn't being
- * sufficiently flushed in uv__write.
- */
- assert(!(stream->flags & UV_STREAM_BLOCKING));
- uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
- }
-
- return 0;
-}
-
-
-/* The buffers to be written must remain valid until the callback is called.
- * This is not required for the uv_buf_t array.
- */
-int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
- uv_write_cb cb) {
- return uv_write2(req, stream, bufs, bufcnt, NULL, cb);
-}
-
-
-static int uv__read_start_common(uv_stream_t* stream,
- uv_alloc_cb alloc_cb,
- uv_read_cb read_cb,
- uv_read2_cb read2_cb) {
- assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
- stream->type == UV_TTY);
-
- if (stream->flags & UV_CLOSING)
- return -EINVAL;
-
- /* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just
- * expresses the desired state of the user.
- */
- stream->flags |= UV_STREAM_READING;
-
-#if defined(__APPLE__)
- /* Notify select() thread about state change */
- if (stream->select != NULL)
- uv__stream_osx_interrupt_select(stream);
-#endif /* defined(__APPLE__) */
-
- /* TODO: try to do the read inline? */
- /* TODO: keep track of tcp state. If we've gotten a EOF then we should
- * not start the IO watcher.
- */
- assert(uv__stream_fd(stream) >= 0);
- assert(alloc_cb);
-
- stream->read_cb = read_cb;
- stream->read2_cb = read2_cb;
- stream->alloc_cb = alloc_cb;
-
- uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
- uv__handle_start(stream);
-
- return 0;
-}
-
-
-int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
- uv_read_cb read_cb) {
- return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
-}
-
-
-int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
- uv_read2_cb read_cb) {
- return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
-}
-
-
-int uv_read_stop(uv_stream_t* stream) {
- /* Sanity check. We're going to stop the handle unless it's primed for
- * writing but that means there should be some kind of write action in
- * progress.
- */
- assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) ||
- !QUEUE_EMPTY(&stream->write_completed_queue) ||
- !QUEUE_EMPTY(&stream->write_queue) ||
- stream->shutdown_req != NULL ||
- stream->connect_req != NULL);
-
- stream->flags &= ~UV_STREAM_READING;
- uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
- if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
- uv__handle_stop(stream);
-
-#if defined(__APPLE__)
- /* Notify select() thread about state change */
- if (stream->select != NULL)
- uv__stream_osx_interrupt_select(stream);
-#endif /* defined(__APPLE__) */
-
- stream->read_cb = NULL;
- stream->read2_cb = NULL;
- stream->alloc_cb = NULL;
- return 0;
-}
-
-
-int uv_is_readable(const uv_stream_t* stream) {
- return stream->flags & UV_STREAM_READABLE;
-}
-
-
-int uv_is_writable(const uv_stream_t* stream) {
- return stream->flags & UV_STREAM_WRITABLE;
-}
-
-
-#if defined(__APPLE__)
-int uv___stream_fd(uv_stream_t* handle) {
- uv__stream_select_t* s;
-
- assert(handle->type == UV_TCP ||
- handle->type == UV_TTY ||
- handle->type == UV_NAMED_PIPE);
-
- s = handle->select;
- if (s != NULL)
- return s->fd;
-
- return handle->io_watcher.fd;
-}
-#endif /* defined(__APPLE__) */
-
-
-void uv__stream_close(uv_stream_t* handle) {
-#if defined(__APPLE__)
- /* Terminate select loop first */
- if (handle->select != NULL) {
- uv__stream_select_t* s;
-
- s = handle->select;
-
- uv_sem_post(&s->close_sem);
- uv_sem_post(&s->async_sem);
- uv__stream_osx_interrupt_select(handle);
- uv_thread_join(&s->thread);
- uv_sem_destroy(&s->close_sem);
- uv_sem_destroy(&s->async_sem);
- close(s->fake_fd);
- close(s->int_fd);
- uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
-
- handle->select = NULL;
- }
-#endif /* defined(__APPLE__) */
-
- uv__io_close(handle->loop, &handle->io_watcher);
- uv_read_stop(handle);
- uv__handle_stop(handle);
-
- close(handle->io_watcher.fd);
- handle->io_watcher.fd = -1;
-
- if (handle->accepted_fd >= 0) {
- close(handle->accepted_fd);
- handle->accepted_fd = -1;
- }
-
- assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
-}
-
-
-int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
- assert(0 && "implement me");
- abort();
- return 0;
-}
+/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to
+ * deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+ * sell copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+ * IN THE SOFTWARE.
+ */
+
+#include "uv.h"
+#include "internal.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <unistd.h>
+#include <limits.h> /* IOV_MAX */
+
+#if defined(__APPLE__)
+# include <sys/event.h>
+# include <sys/time.h>
+# include <sys/select.h>
+
+/* Forward declaration */
+typedef struct uv__stream_select_s uv__stream_select_t;
+
+struct uv__stream_select_s {
+ uv_stream_t* stream;
+ uv_thread_t thread;
+ uv_sem_t close_sem;
+ uv_sem_t async_sem;
+ uv_async_t async;
+ int events;
+ int fake_fd;
+ int int_fd;
+ int fd;
+};
+#endif /* defined(__APPLE__) */
+
+static void uv__stream_connect(uv_stream_t*);
+static void uv__write(uv_stream_t* stream);
+static void uv__read(uv_stream_t* stream);
+static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
+static size_t uv__write_req_size(uv_write_t* req);
+
+
+/* Used by the accept() EMFILE party trick. */
+static int uv__open_cloexec(const char* path, int flags) {
+ int err;
+ int fd;
+
+#if defined(__linux__)
+ fd = open(path, flags | UV__O_CLOEXEC);
+ if (fd != -1)
+ return fd;
+
+ if (errno != EINVAL)
+ return -errno;
+
+ /* O_CLOEXEC not supported. */
+#endif
+
+ fd = open(path, flags);
+ if (fd == -1)
+ return -errno;
+
+ err = uv__cloexec(fd, 1);
+ if (err) {
+ close(fd);
+ return err;
+ }
+
+ return fd;
+}
+
+
+static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
+ size_t total = 0;
+ int i;
+
+ for (i = 0; i < bufcnt; i++) {
+ total += bufs[i].len;
+ }
+
+ return total;
+}
+
+
+void uv__stream_init(uv_loop_t* loop,
+ uv_stream_t* stream,
+ uv_handle_type type) {
+ int err;
+
+ uv__handle_init(loop, (uv_handle_t*)stream, type);
+ stream->read_cb = NULL;
+ stream->read2_cb = NULL;
+ stream->alloc_cb = NULL;
+ stream->close_cb = NULL;
+ stream->connection_cb = NULL;
+ stream->connect_req = NULL;
+ stream->shutdown_req = NULL;
+ stream->accepted_fd = -1;
+ stream->delayed_error = 0;
+ QUEUE_INIT(&stream->write_queue);
+ QUEUE_INIT(&stream->write_completed_queue);
+ stream->write_queue_size = 0;
+
+ if (loop->emfile_fd == -1) {
+ err = uv__open_cloexec("/", O_RDONLY);
+ if (err >= 0)
+ loop->emfile_fd = err;
+ }
+
+#if defined(__APPLE__)
+ stream->select = NULL;
+#endif /* defined(__APPLE_) */
+
+ uv__io_init(&stream->io_watcher, uv__stream_io, -1);
+}
+
+
+#if defined(__APPLE__)
+static void uv__stream_osx_select(void* arg) {
+ uv_stream_t* stream;
+ uv__stream_select_t* s;
+ char buf[1024];
+ fd_set sread;
+ fd_set swrite;
+ int events;
+ int fd;
+ int r;
+ int max_fd;
+
+ stream = arg;
+ s = stream->select;
+ fd = s->fd;
+
+ if (fd > s->int_fd)
+ max_fd = fd;
+ else
+ max_fd = s->int_fd;
+
+ while (1) {
+ /* Terminate on semaphore */
+ if (uv_sem_trywait(&s->close_sem) == 0)
+ break;
+
+ /* Watch fd using select(2) */
+ FD_ZERO(&sread);
+ FD_ZERO(&swrite);
+
+ if (uv_is_readable(stream))
+ FD_SET(fd, &sread);
+ if (uv_is_writable(stream))
+ FD_SET(fd, &swrite);
+ FD_SET(s->int_fd, &sread);
+
+ /* Wait indefinitely for fd events */
+ r = select(max_fd + 1, &sread, &swrite, NULL, NULL);
+ if (r == -1) {
+ if (errno == EINTR)
+ continue;
+
+ /* XXX: Possible?! */
+ abort();
+ }
+
+ /* Ignore timeouts */
+ if (r == 0)
+ continue;
+
+ /* Empty socketpair's buffer in case of interruption */
+ if (FD_ISSET(s->int_fd, &sread))
+ while (1) {
+ r = read(s->int_fd, buf, sizeof(buf));
+
+ if (r == sizeof(buf))
+ continue;
+
+ if (r != -1)
+ break;
+
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ break;
+
+ if (errno == EINTR)
+ continue;
+
+ abort();
+ }
+
+ /* Handle events */
+ events = 0;
+ if (FD_ISSET(fd, &sread))
+ events |= UV__POLLIN;
+ if (FD_ISSET(fd, &swrite))
+ events |= UV__POLLOUT;
+
+ assert(events != 0 || FD_ISSET(s->int_fd, &sread));
+ if (events != 0) {
+ ACCESS_ONCE(int, s->events) = events;
+
+ uv_async_send(&s->async);
+ uv_sem_wait(&s->async_sem);
+
+ /* Should be processed at this stage */
+ assert((s->events == 0) || (stream->flags & UV_CLOSING));
+ }
+ }
+}
+
+
+static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
+ /* Notify select() thread about state change */
+ uv__stream_select_t* s;
+ int r;
+
+ s = stream->select;
+
+ /* Interrupt select() loop
+ * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
+ * emit read event on other side
+ */
+ do
+ r = write(s->fake_fd, "x", 1);
+ while (r == -1 && errno == EINTR);
+
+ assert(r == 1);
+}
+
+
+static void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
+ uv__stream_select_t* s;
+ uv_stream_t* stream;
+ int events;
+
+ s = container_of(handle, uv__stream_select_t, async);
+ stream = s->stream;
+
+ /* Get and reset stream's events */
+ events = s->events;
+ ACCESS_ONCE(int, s->events) = 0;
+ uv_sem_post(&s->async_sem);
+
+ assert(events != 0);
+ assert(events == (events & (UV__POLLIN | UV__POLLOUT)));
+
+ /* Invoke callback on event-loop */
+ if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
+ uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
+
+ if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
+ uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
+}
+
+
+static void uv__stream_osx_cb_close(uv_handle_t* async) {
+ uv__stream_select_t* s;
+
+ s = container_of(async, uv__stream_select_t, async);
+ free(s);
+}
+
+
+int uv__stream_try_select(uv_stream_t* stream, int* fd) {
+ /*
+ * kqueue doesn't work with some files from /dev mount on osx.
+ * select(2) in separate thread for those fds
+ */
+
+ struct kevent filter[1];
+ struct kevent events[1];
+ struct timespec timeout;
+ uv__stream_select_t* s;
+ int fds[2];
+ int err;
+ int ret;
+ int kq;
+
+ kq = kqueue();
+ if (kq == -1) {
+ fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno);
+ return -errno;
+ }
+
+ EV_SET(&filter[0], *fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
+
+ /* Use small timeout, because we only want to capture EINVALs */
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = 1;
+
+ ret = kevent(kq, filter, 1, events, 1, &timeout);
+ SAVE_ERRNO(close(kq));
+
+ if (ret == -1)
+ return -errno;
+
+ if (ret == 0 || (events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
+ return 0;
+
+ /* At this point we definitely know that this fd won't work with kqueue */
+ s = malloc(sizeof(*s));
+ if (s == NULL)
+ return -ENOMEM;
+
+ s->events = 0;
+ s->fd = *fd;
+
+ err = uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb);
+ if (err) {
+ free(s);
+ return err;
+ }
+
+ s->async.flags |= UV__HANDLE_INTERNAL;
+ uv__handle_unref(&s->async);
+
+ if (uv_sem_init(&s->close_sem, 0))
+ goto fatal1;
+
+ if (uv_sem_init(&s->async_sem, 0))
+ goto fatal2;
+
+ /* Create fds for io watcher and to interrupt the select() loop. */
+ if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
+ goto fatal3;
+
+ s->fake_fd = fds[0];
+ s->int_fd = fds[1];
+
+ if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
+ goto fatal4;
+
+ s->stream = stream;
+ stream->select = s;
+ *fd = s->fake_fd;
+
+ return 0;
+
+fatal4:
+ close(s->fake_fd);
+ close(s->int_fd);
+ s->fake_fd = -1;
+ s->int_fd = -1;
+fatal3:
+ uv_sem_destroy(&s->async_sem);
+fatal2:
+ uv_sem_destroy(&s->close_sem);
+fatal1:
+ uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
+ return -errno;
+}
+#endif /* defined(__APPLE__) */
+
+
+int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
+ assert(fd >= 0);
+ stream->flags |= flags;
+
+ if (stream->type == UV_TCP) {
+ if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
+ return -errno;
+
+ /* TODO Use delay the user passed in. */
+ if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
+ return -errno;
+ }
+
+ stream->io_watcher.fd = fd;
+
+ return 0;
+}
+
+
+void uv__stream_destroy(uv_stream_t* stream) {
+ uv_write_t* req;
+ QUEUE* q;
+
+ assert(!uv__io_active(&stream->io_watcher, UV__POLLIN | UV__POLLOUT));
+ assert(stream->flags & UV_CLOSED);
+
+ if (stream->connect_req) {
+ uv__req_unregister(stream->loop, stream->connect_req);
+ stream->connect_req->cb(stream->connect_req, -ECANCELED);
+ stream->connect_req = NULL;
+ }
+
+ while (!QUEUE_EMPTY(&stream->write_queue)) {
+ q = QUEUE_HEAD(&stream->write_queue);
+ QUEUE_REMOVE(q);
+
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ uv__req_unregister(stream->loop, req);
+
+ if (req->bufs != req->bufsml)
+ free(req->bufs);
+ req->bufs = NULL;
+
+ if (req->cb != NULL)
+ req->cb(req, -ECANCELED);
+ }
+
+ while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
+ q = QUEUE_HEAD(&stream->write_completed_queue);
+ QUEUE_REMOVE(q);
+
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ uv__req_unregister(stream->loop, req);
+
+ if (req->bufs != NULL) {
+ stream->write_queue_size -= uv__write_req_size(req);
+ if (req->bufs != req->bufsml)
+ free(req->bufs);
+ req->bufs = NULL;
+ }
+
+ if (req->cb)
+ req->cb(req, req->error);
+ }
+
+ if (stream->shutdown_req) {
+ /* The ECANCELED error code is a lie, the shutdown(2) syscall is a
+ * fait accompli at this point. Maybe we should revisit this in v0.11.
+ * A possible reason for leaving it unchanged is that it informs the
+ * callee that the handle has been destroyed.
+ */
+ uv__req_unregister(stream->loop, stream->shutdown_req);
+ stream->shutdown_req->cb(stream->shutdown_req, -ECANCELED);
+ stream->shutdown_req = NULL;
+ }
+}
+
+
+/* Implements a best effort approach to mitigating accept() EMFILE errors.
+ * We have a spare file descriptor stashed away that we close to get below
+ * the EMFILE limit. Next, we accept all pending connections and close them
+ * immediately to signal the clients that we're overloaded - and we are, but
+ * we still keep on trucking.
+ *
+ * There is one caveat: it's not reliable in a multi-threaded environment.
+ * The file descriptor limit is per process. Our party trick fails if another
+ * thread opens a file or creates a socket in the time window between us
+ * calling close() and accept().
+ */
+static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
+ int fd;
+
+ if (loop->emfile_fd == -1)
+ return -EMFILE;
+
+ close(loop->emfile_fd);
+
+ for (;;) {
+ fd = uv__accept(accept_fd);
+
+ if (fd != -1) {
+ close(fd);
+ continue;
+ }
+
+ if (errno == EINTR)
+ continue;
+
+ SAVE_ERRNO(loop->emfile_fd = uv__open_cloexec("/", O_RDONLY));
+ return -errno;
+ }
+}
+
+
+#if defined(UV_HAVE_KQUEUE)
+# define UV_DEC_BACKLOG(w) w->rcount--;
+#else
+# define UV_DEC_BACKLOG(w) /* no-op */
+#endif /* defined(UV_HAVE_KQUEUE) */
+
+
+void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
+ uv_stream_t* stream;
+ int err;
+
+ stream = container_of(w, uv_stream_t, io_watcher);
+ assert(events == UV__POLLIN);
+ assert(stream->accepted_fd == -1);
+ assert(!(stream->flags & UV_CLOSING));
+
+ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
+
+ /* connection_cb can close the server socket while we're
+ * in the loop so check it on each iteration.
+ */
+ while (uv__stream_fd(stream) != -1) {
+ assert(stream->accepted_fd == -1);
+
+#if defined(UV_HAVE_KQUEUE)
+ if (w->rcount <= 0)
+ return;
+#endif /* defined(UV_HAVE_KQUEUE) */
+
+ err = uv__accept(uv__stream_fd(stream));
+ if (err < 0) {
+ if (err == -EAGAIN || err == -EWOULDBLOCK)
+ return; /* Not an error. */
+
+ if (err == -ECONNABORTED)
+ continue; /* Ignore. Nothing we can do about that. */
+
+ if (err == -EMFILE || err == -ENFILE) {
+ err = uv__emfile_trick(loop, uv__stream_fd(stream));
+ if (err == -EAGAIN || err == -EWOULDBLOCK)
+ break;
+ }
+
+ stream->connection_cb(stream, err);
+ continue;
+ }
+
+ UV_DEC_BACKLOG(w)
+ stream->accepted_fd = err;
+ stream->connection_cb(stream, 0);
+
+ if (stream->accepted_fd != -1) {
+ /* The user hasn't yet accepted called uv_accept() */
+ uv__io_stop(loop, &stream->io_watcher, UV__POLLIN);
+ return;
+ }
+
+ if (stream->type == UV_TCP && (stream->flags & UV_TCP_SINGLE_ACCEPT)) {
+ /* Give other processes a chance to accept connections. */
+ struct timespec timeout = { 0, 1 };
+ nanosleep(&timeout, NULL);
+ }
+ }
+}
+
+
+#undef UV_DEC_BACKLOG
+
+
+int uv_accept(uv_stream_t* server, uv_stream_t* client) {
+ int err;
+
+ /* TODO document this */
+ assert(server->loop == client->loop);
+
+ if (server->accepted_fd == -1)
+ return -EAGAIN;
+
+ switch (client->type) {
+ case UV_NAMED_PIPE:
+ case UV_TCP:
+ err = uv__stream_open(client,
+ server->accepted_fd,
+ UV_STREAM_READABLE | UV_STREAM_WRITABLE);
+ if (err) {
+ /* TODO handle error */
+ close(server->accepted_fd);
+ server->accepted_fd = -1;
+ return err;
+ }
+ break;
+
+ case UV_UDP:
+ err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
+ if (err) {
+ close(server->accepted_fd);
+ server->accepted_fd = -1;
+ return err;
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+
+ uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
+ server->accepted_fd = -1;
+ return 0;
+}
+
+
+int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
+ int err;
+
+ err = -EINVAL;
+ switch (stream->type) {
+ case UV_TCP:
+ err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
+ break;
+
+ case UV_NAMED_PIPE:
+ err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
+ break;
+
+ default:
+ assert(0);
+ }
+
+ if (err == 0)
+ uv__handle_start(stream);
+
+ return err;
+}
+
+
+static void uv__drain(uv_stream_t* stream) {
+ uv_shutdown_t* req;
+ int err;
+
+ assert(QUEUE_EMPTY(&stream->write_queue));
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
+
+ /* Shutdown? */
+ if ((stream->flags & UV_STREAM_SHUTTING) &&
+ !(stream->flags & UV_CLOSING) &&
+ !(stream->flags & UV_STREAM_SHUT)) {
+ assert(stream->shutdown_req);
+
+ req = stream->shutdown_req;
+ stream->shutdown_req = NULL;
+ stream->flags &= ~UV_STREAM_SHUTTING;
+ uv__req_unregister(stream->loop, req);
+
+ err = 0;
+ if (shutdown(uv__stream_fd(stream), SHUT_WR))
+ err = -errno;
+
+ if (err == 0)
+ stream->flags |= UV_STREAM_SHUT;
+
+ if (req->cb != NULL)
+ req->cb(req, err);
+ }
+}
+
+
+static size_t uv__write_req_size(uv_write_t* req) {
+ size_t size;
+
+ assert(req->bufs != NULL);
+ size = uv__buf_count(req->bufs + req->write_index,
+ req->bufcnt - req->write_index);
+ assert(req->handle->write_queue_size >= size);
+
+ return size;
+}
+
+
+static void uv__write_req_finish(uv_write_t* req) {
+ uv_stream_t* stream = req->handle;
+
+ /* Pop the req off tcp->write_queue. */
+ QUEUE_REMOVE(&req->queue);
+
+ /* Only free when there was no error. On error, we touch up write_queue_size
+ * right before making the callback. The reason we don't do that right away
+ * is that a write_queue_size > 0 is our only way to signal to the user that
+ * he should stop writing - which he should if we got an error. Something to
+ * revisit in future revisions of the libuv API.
+ */
+ if (req->error == 0) {
+ if (req->bufs != req->bufsml)
+ free(req->bufs);
+ req->bufs = NULL;
+ }
+
+ /* Add it to the write_completed_queue where it will have its
+ * callback called in the near future.
+ */
+ QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue);
+ uv__io_feed(stream->loop, &stream->io_watcher);
+}
+
+
+static int uv__handle_fd(uv_handle_t* handle) {
+ switch (handle->type) {
+ case UV_NAMED_PIPE:
+ case UV_TCP:
+ return ((uv_stream_t*) handle)->io_watcher.fd;
+
+ case UV_UDP:
+ return ((uv_udp_t*) handle)->io_watcher.fd;
+
+ default:
+ return -1;
+ }
+}
+
+static int uv__getiovmax() {
+#if defined(IOV_MAX)
+ return IOV_MAX;
+#elif defined(_SC_IOV_MAX)
+ static int iovmax = -1;
+ if (iovmax == -1)
+ iovmax = sysconf(_SC_IOV_MAX);
+ return iovmax;
+#else
+ return 1024;
+#endif
+}
+
+static void uv__write(uv_stream_t* stream) {
+ struct iovec* iov;
+ QUEUE* q;
+ uv_write_t* req;
+ int iovmax;
+ int iovcnt;
+ ssize_t n;
+
+start:
+
+ assert(uv__stream_fd(stream) >= 0);
+
+ if (QUEUE_EMPTY(&stream->write_queue))
+ return;
+
+ q = QUEUE_HEAD(&stream->write_queue);
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ assert(req->handle == stream);
+
+ /*
+ * Cast to iovec. We had to have our own uv_buf_t instead of iovec
+ * because Windows's WSABUF is not an iovec.
+ */
+ assert(sizeof(uv_buf_t) == sizeof(struct iovec));
+ iov = (struct iovec*) &(req->bufs[req->write_index]);
+ iovcnt = req->bufcnt - req->write_index;
+
+ iovmax = uv__getiovmax();
+
+ /* Limit iov count to avoid EINVALs from writev() */
+ if (iovcnt > iovmax)
+ iovcnt = iovmax;
+
+ /*
+ * Now do the actual writev. Note that we've been updating the pointers
+ * inside the iov each time we write. So there is no need to offset it.
+ */
+
+ if (req->send_handle) {
+ struct msghdr msg;
+ char scratch[64];
+ struct cmsghdr *cmsg;
+ int fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
+
+ assert(fd_to_send >= 0);
+
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_iov = iov;
+ msg.msg_iovlen = iovcnt;
+ msg.msg_flags = 0;
+
+ msg.msg_control = (void*) scratch;
+ msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
+
+ cmsg = CMSG_FIRSTHDR(&msg);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = msg.msg_controllen;
+
+ /* silence aliasing warning */
+ {
+ void* pv = CMSG_DATA(cmsg);
+ int* pi = pv;
+ *pi = fd_to_send;
+ }
+
+ do {
+ n = sendmsg(uv__stream_fd(stream), &msg, 0);
+ }
+ while (n == -1 && errno == EINTR);
+ } else {
+ do {
+ if (iovcnt == 1) {
+ n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
+ } else {
+ n = writev(uv__stream_fd(stream), iov, iovcnt);
+ }
+ }
+ while (n == -1 && errno == EINTR);
+ }
+
+ if (n < 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ /* Error */
+ req->error = -errno;
+ uv__write_req_finish(req);
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
+ if (!uv__io_active(&stream->io_watcher, UV__POLLIN))
+ uv__handle_stop(stream);
+ return;
+ } else if (stream->flags & UV_STREAM_BLOCKING) {
+ /* If this is a blocking stream, try again. */
+ goto start;
+ }
+ } else {
+ /* Successful write */
+
+ while (n >= 0) {
+ uv_buf_t* buf = &(req->bufs[req->write_index]);
+ size_t len = buf->len;
+
+ assert(req->write_index < req->bufcnt);
+
+ if ((size_t)n < len) {
+ buf->base += n;
+ buf->len -= n;
+ stream->write_queue_size -= n;
+ n = 0;
+
+ /* There is more to write. */
+ if (stream->flags & UV_STREAM_BLOCKING) {
+ /*
+ * If we're blocking then we should not be enabling the write
+ * watcher - instead we need to try again.
+ */
+ goto start;
+ } else {
+ /* Break loop and ensure the watcher is pending. */
+ break;
+ }
+
+ } else {
+ /* Finished writing the buf at index req->write_index. */
+ req->write_index++;
+
+ assert((size_t)n >= len);
+ n -= len;
+
+ assert(stream->write_queue_size >= len);
+ stream->write_queue_size -= len;
+
+ if (req->write_index == req->bufcnt) {
+ /* Then we're done! */
+ assert(n == 0);
+ uv__write_req_finish(req);
+ /* TODO: start trying to write the next request. */
+ return;
+ }
+ }
+ }
+ }
+
+ /* Either we've counted n down to zero or we've got EAGAIN. */
+ assert(n == 0 || n == -1);
+
+ /* Only non-blocking streams should use the write_watcher. */
+ assert(!(stream->flags & UV_STREAM_BLOCKING));
+
+ /* We're not done. */
+ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
+}
+
+
+static void uv__write_callbacks(uv_stream_t* stream) {
+ uv_write_t* req;
+ QUEUE* q;
+
+ while (!QUEUE_EMPTY(&stream->write_completed_queue)) {
+ /* Pop a req off write_completed_queue. */
+ q = QUEUE_HEAD(&stream->write_completed_queue);
+ req = QUEUE_DATA(q, uv_write_t, queue);
+ QUEUE_REMOVE(q);
+ uv__req_unregister(stream->loop, req);
+
+ if (req->bufs != NULL) {
+ stream->write_queue_size -= uv__write_req_size(req);
+ if (req->bufs != req->bufsml)
+ free(req->bufs);
+ req->bufs = NULL;
+ }
+
+ /* NOTE: call callback AFTER freeing the request data. */
+ if (req->cb)
+ req->cb(req, req->error);
+ }
+
+ assert(QUEUE_EMPTY(&stream->write_completed_queue));
+
+ /* Write queue drained. */
+ if (QUEUE_EMPTY(&stream->write_queue))
+ uv__drain(stream);
+}
+
+
+static uv_handle_type uv__handle_type(int fd) {
+ struct sockaddr_storage ss;
+ socklen_t len;
+ int type;
+
+ memset(&ss, 0, sizeof(ss));
+ len = sizeof(ss);
+
+ if (getsockname(fd, (struct sockaddr*)&ss, &len))
+ return UV_UNKNOWN_HANDLE;
+
+ len = sizeof type;
+
+ if (getsockopt(fd, SOL_SOCKET, SO_TYPE, &type, &len))
+ return UV_UNKNOWN_HANDLE;
+
+ if (type == SOCK_STREAM) {
+ switch (ss.ss_family) {
+ case AF_UNIX:
+ return UV_NAMED_PIPE;
+ case AF_INET:
+ case AF_INET6:
+ return UV_TCP;
+ }
+ }
+
+ if (type == SOCK_DGRAM &&
+ (ss.ss_family == AF_INET || ss.ss_family == AF_INET6))
+ return UV_UDP;
+
+ return UV_UNKNOWN_HANDLE;
+}
+
+
+static void uv__stream_read_cb(uv_stream_t* stream,
+ int status,
+ uv_buf_t buf,
+ uv_handle_type type) {
+ if (stream->read_cb != NULL)
+ stream->read_cb(stream, status, buf);
+ else
+ stream->read2_cb((uv_pipe_t*) stream, status, buf, type);
+}
+
+
+static void uv__stream_eof(uv_stream_t* stream, uv_buf_t buf) {
+ stream->flags |= UV_STREAM_READ_EOF;
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
+ if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
+ uv__handle_stop(stream);
+ uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
+}
+
+
+static void uv__read(uv_stream_t* stream) {
+ uv_buf_t buf;
+ ssize_t nread;
+ struct msghdr msg;
+ struct cmsghdr* cmsg;
+ char cmsg_space[64];
+ int count;
+
+ stream->flags &= ~UV_STREAM_READ_PARTIAL;
+
+ /* Prevent loop starvation when the data comes in as fast as (or faster than)
+ * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O.
+ */
+ count = 32;
+
+ /* XXX: Maybe instead of having UV_STREAM_READING we just test if
+ * tcp->read_cb is NULL or not?
+ */
+ while ((stream->read_cb || stream->read2_cb)
+ && (stream->flags & UV_STREAM_READING)
+ && (count-- > 0)) {
+ assert(stream->alloc_cb != NULL);
+
+ buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
+ if (buf.len == 0) {
+ /* User indicates it can't or won't handle the read. */
+ uv__stream_read_cb(stream, UV_ENOBUFS, buf, UV_UNKNOWN_HANDLE);
+ return;
+ }
+
+ assert(buf.base != NULL);
+ assert(uv__stream_fd(stream) >= 0);
+
+ if (stream->read_cb) {
+ do {
+ nread = read(uv__stream_fd(stream), buf.base, buf.len);
+ }
+ while (nread < 0 && errno == EINTR);
+ } else {
+ assert(stream->read2_cb);
+ /* read2_cb uses recvmsg */
+ msg.msg_flags = 0;
+ msg.msg_iov = (struct iovec*) &buf;
+ msg.msg_iovlen = 1;
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ /* Set up to receive a descriptor even if one isn't in the message */
+ msg.msg_controllen = 64;
+ msg.msg_control = (void*) cmsg_space;
+
+ do {
+ nread = recvmsg(uv__stream_fd(stream), &msg, 0);
+ }
+ while (nread < 0 && errno == EINTR);
+ }
+
+ if (nread < 0) {
+ /* Error */
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ /* Wait for the next one. */
+ if (stream->flags & UV_STREAM_READING) {
+ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
+ }
+ uv__stream_read_cb(stream, 0, buf, UV_UNKNOWN_HANDLE);
+ } else {
+ /* Error. User should call uv_close(). */
+ uv__stream_read_cb(stream, -errno, buf, UV_UNKNOWN_HANDLE);
+ assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
+ "stream->read_cb(status=-1) did not call uv_close()");
+ }
+ return;
+ } else if (nread == 0) {
+ uv__stream_eof(stream, buf);
+ return;
+ } else {
+ /* Successful read */
+ ssize_t buflen = buf.len;
+
+ if (stream->read_cb) {
+ stream->read_cb(stream, nread, buf);
+ } else {
+ assert(stream->read2_cb);
+
+ /*
+ * XXX: Some implementations can send multiple file descriptors in a
+ * single message. We should be using CMSG_NXTHDR() to walk the
+ * chain to get at them all. This would require changing the API to
+ * hand these back up the caller, is a pain.
+ */
+
+ for (cmsg = CMSG_FIRSTHDR(&msg);
+ msg.msg_controllen > 0 && cmsg != NULL;
+ cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+
+ if (cmsg->cmsg_type == SCM_RIGHTS) {
+ if (stream->accepted_fd != -1) {
+ fprintf(stderr, "(libuv) ignoring extra FD received\n");
+ }
+
+ /* silence aliasing warning */
+ {
+ void* pv = CMSG_DATA(cmsg);
+ int* pi = pv;
+ stream->accepted_fd = *pi;
+ }
+
+ } else {
+ fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
+ cmsg->cmsg_type);
+ }
+ }
+
+
+ if (stream->accepted_fd >= 0) {
+ stream->read2_cb((uv_pipe_t*)stream, nread, buf,
+ uv__handle_type(stream->accepted_fd));
+ } else {
+ stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE);
+ }
+ }
+
+ /* Return if we didn't fill the buffer, there is no more data to read. */
+ if (nread < buflen) {
+ stream->flags |= UV_STREAM_READ_PARTIAL;
+ return;
+ }
+ }
+ }
+}
+
+
+int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
+ assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
+ "uv_shutdown (unix) only supports uv_handle_t right now");
+ assert(uv__stream_fd(stream) >= 0);
+
+ if (!(stream->flags & UV_STREAM_WRITABLE) ||
+ stream->flags & UV_STREAM_SHUT ||
+ stream->flags & UV_CLOSED ||
+ stream->flags & UV_CLOSING) {
+ return -ENOTCONN;
+ }
+
+ /* Initialize request */
+ uv__req_init(stream->loop, req, UV_SHUTDOWN);
+ req->handle = stream;
+ req->cb = cb;
+ stream->shutdown_req = req;
+ stream->flags |= UV_STREAM_SHUTTING;
+
+ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
+
+ return 0;
+}
+
+
+static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
+ uv_stream_t* stream;
+
+ stream = container_of(w, uv_stream_t, io_watcher);
+
+ assert(stream->type == UV_TCP ||
+ stream->type == UV_NAMED_PIPE ||
+ stream->type == UV_TTY);
+ assert(!(stream->flags & UV_CLOSING));
+
+ if (stream->connect_req) {
+ uv__stream_connect(stream);
+ return;
+ }
+
+ assert(uv__stream_fd(stream) >= 0);
+
+ /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */
+ if (events & (UV__POLLIN | UV__POLLERR))
+ uv__read(stream);
+
+ if (uv__stream_fd(stream) == -1)
+ return; /* read_cb closed stream. */
+
+ /* Short-circuit iff POLLHUP is set, the user is still interested in read
+ * events and uv__read() reported a partial read but not EOF. If the EOF
+ * flag is set, uv__read() called read_cb with err=UV_EOF and we don't
+ * have to do anything. If the partial read flag is not set, we can't
+ * report the EOF yet because there is still data to read.
+ */
+ if ((events & UV__POLLHUP) &&
+ (stream->flags & UV_STREAM_READING) &&
+ (stream->flags & UV_STREAM_READ_PARTIAL) &&
+ !(stream->flags & UV_STREAM_READ_EOF)) {
+ uv_buf_t buf = { NULL, 0 };
+ uv__stream_eof(stream, buf);
+ }
+
+ if (uv__stream_fd(stream) == -1)
+ return; /* read_cb closed stream. */
+
+ if (events & (UV__POLLOUT | UV__POLLERR | UV__POLLHUP)) {
+ uv__write(stream);
+ uv__write_callbacks(stream);
+ }
+}
+
+
+/**
+ * We get called here from directly following a call to connect(2).
+ * In order to determine if we've errored out or succeeded must call
+ * getsockopt.
+ */
+static void uv__stream_connect(uv_stream_t* stream) {
+ int error;
+ uv_connect_t* req = stream->connect_req;
+ socklen_t errorsize = sizeof(int);
+
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
+ assert(req);
+
+ if (stream->delayed_error) {
+ /* To smooth over the differences between unixes errors that
+ * were reported synchronously on the first connect can be delayed
+ * until the next tick--which is now.
+ */
+ error = stream->delayed_error;
+ stream->delayed_error = 0;
+ } else {
+ /* Normal situation: we need to get the socket error from the kernel. */
+ assert(uv__stream_fd(stream) >= 0);
+ getsockopt(uv__stream_fd(stream),
+ SOL_SOCKET,
+ SO_ERROR,
+ &error,
+ &errorsize);
+ error = -error;
+ }
+
+ if (error == -EINPROGRESS)
+ return;
+
+ stream->connect_req = NULL;
+ uv__req_unregister(stream->loop, req);
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
+
+ if (req->cb)
+ req->cb(req, error);
+}
+
+
+int uv_write2(uv_write_t* req,
+ uv_stream_t* stream,
+ uv_buf_t bufs[],
+ int bufcnt,
+ uv_stream_t* send_handle,
+ uv_write_cb cb) {
+ int empty_queue;
+
+ assert(bufcnt > 0);
+ assert((stream->type == UV_TCP ||
+ stream->type == UV_NAMED_PIPE ||
+ stream->type == UV_TTY) &&
+ "uv_write (unix) does not yet support other types of streams");
+
+ if (uv__stream_fd(stream) < 0)
+ return -EBADF;
+
+ if (send_handle) {
+ if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
+ return -EINVAL;
+
+ /* XXX We abuse uv_write2() to send over UDP handles to child processes.
+ * Don't call uv__stream_fd() on those handles, it's a macro that on OS X
+ * evaluates to a function that operates on a uv_stream_t with a couple of
+ * OS X specific fields. On other Unices it does (handle)->io_watcher.fd,
+ * which works but only by accident.
+ */
+ if (uv__handle_fd((uv_handle_t*) send_handle) < 0)
+ return -EBADF;
+ }
+
+ /* It's legal for write_queue_size > 0 even when the write_queue is empty;
+ * it means there are error-state requests in the write_completed_queue that
+ * will touch up write_queue_size later, see also uv__write_req_finish().
+ * We chould check that write_queue is empty instead but that implies making
+ * a write() syscall when we know that the handle is in error mode.
+ */
+ empty_queue = (stream->write_queue_size == 0);
+
+ /* Initialize the req */
+ uv__req_init(stream->loop, req, UV_WRITE);
+ req->cb = cb;
+ req->handle = stream;
+ req->error = 0;
+ req->send_handle = send_handle;
+ QUEUE_INIT(&req->queue);
+
+ if (bufcnt <= (int) ARRAY_SIZE(req->bufsml))
+ req->bufs = req->bufsml;
+ else
+ req->bufs = malloc(sizeof(uv_buf_t) * bufcnt);
+
+ memcpy(req->bufs, bufs, bufcnt * sizeof(uv_buf_t));
+ req->bufcnt = bufcnt;
+ req->write_index = 0;
+ stream->write_queue_size += uv__buf_count(bufs, bufcnt);
+
+ /* Append the request to write_queue. */
+ QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue);
+
+ /* If the queue was empty when this function began, we should attempt to
+ * do the write immediately. Otherwise start the write_watcher and wait
+ * for the fd to become writable.
+ */
+ if (stream->connect_req) {
+ /* Still connecting, do nothing. */
+ }
+ else if (empty_queue) {
+ uv__write(stream);
+ }
+ else {
+ /*
+ * blocking streams should never have anything in the queue.
+ * if this assert fires then somehow the blocking stream isn't being
+ * sufficiently flushed in uv__write.
+ */
+ assert(!(stream->flags & UV_STREAM_BLOCKING));
+ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
+ }
+
+ return 0;
+}
+
+
+/* The buffers to be written must remain valid until the callback is called.
+ * This is not required for the uv_buf_t array.
+ */
+int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
+ uv_write_cb cb) {
+ return uv_write2(req, stream, bufs, bufcnt, NULL, cb);
+}
+
+
+static int uv__read_start_common(uv_stream_t* stream,
+ uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb,
+ uv_read2_cb read2_cb) {
+ assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
+ stream->type == UV_TTY);
+
+ if (stream->flags & UV_CLOSING)
+ return -EINVAL;
+
+ /* The UV_STREAM_READING flag is irrelevant of the state of the tcp - it just
+ * expresses the desired state of the user.
+ */
+ stream->flags |= UV_STREAM_READING;
+
+#if defined(__APPLE__)
+ /* Notify select() thread about state change */
+ if (stream->select != NULL)
+ uv__stream_osx_interrupt_select(stream);
+#endif /* defined(__APPLE__) */
+
+ /* TODO: try to do the read inline? */
+ /* TODO: keep track of tcp state. If we've gotten a EOF then we should
+ * not start the IO watcher.
+ */
+ assert(uv__stream_fd(stream) >= 0);
+ assert(alloc_cb);
+
+ stream->read_cb = read_cb;
+ stream->read2_cb = read2_cb;
+ stream->alloc_cb = alloc_cb;
+
+ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
+ uv__handle_start(stream);
+
+ return 0;
+}
+
+
+int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
+ uv_read_cb read_cb) {
+ return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
+}
+
+
+int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
+ uv_read2_cb read_cb) {
+ return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
+}
+
+
+int uv_read_stop(uv_stream_t* stream) {
+ /* Sanity check. We're going to stop the handle unless it's primed for
+ * writing but that means there should be some kind of write action in
+ * progress.
+ */
+ assert(!uv__io_active(&stream->io_watcher, UV__POLLOUT) ||
+ !QUEUE_EMPTY(&stream->write_completed_queue) ||
+ !QUEUE_EMPTY(&stream->write_queue) ||
+ stream->shutdown_req != NULL ||
+ stream->connect_req != NULL);
+
+ stream->flags &= ~UV_STREAM_READING;
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
+ if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
+ uv__handle_stop(stream);
+
+#if defined(__APPLE__)
+ /* Notify select() thread about state change */
+ if (stream->select != NULL)
+ uv__stream_osx_interrupt_select(stream);
+#endif /* defined(__APPLE__) */
+
+ stream->read_cb = NULL;
+ stream->read2_cb = NULL;
+ stream->alloc_cb = NULL;
+ return 0;
+}
+
+
+int uv_is_readable(const uv_stream_t* stream) {
+ return stream->flags & UV_STREAM_READABLE;
+}
+
+
+int uv_is_writable(const uv_stream_t* stream) {
+ return stream->flags & UV_STREAM_WRITABLE;
+}
+
+
+#if defined(__APPLE__)
+int uv___stream_fd(uv_stream_t* handle) {
+ uv__stream_select_t* s;
+
+ assert(handle->type == UV_TCP ||
+ handle->type == UV_TTY ||
+ handle->type == UV_NAMED_PIPE);
+
+ s = handle->select;
+ if (s != NULL)
+ return s->fd;
+
+ return handle->io_watcher.fd;
+}
+#endif /* defined(__APPLE__) */
+
+
+void uv__stream_close(uv_stream_t* handle) {
+#if defined(__APPLE__)
+ /* Terminate select loop first */
+ if (handle->select != NULL) {
+ uv__stream_select_t* s;
+
+ s = handle->select;
+
+ uv_sem_post(&s->close_sem);
+ uv_sem_post(&s->async_sem);
+ uv__stream_osx_interrupt_select(handle);
+ uv_thread_join(&s->thread);
+ uv_sem_destroy(&s->close_sem);
+ uv_sem_destroy(&s->async_sem);
+ close(s->fake_fd);
+ close(s->int_fd);
+ uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
+
+ handle->select = NULL;
+ }
+#endif /* defined(__APPLE__) */
+
+ uv__io_close(handle->loop, &handle->io_watcher);
+ uv_read_stop(handle);
+ uv__handle_stop(handle);
+
+ close(handle->io_watcher.fd);
+ handle->io_watcher.fd = -1;
+
+ if (handle->accepted_fd >= 0) {
+ close(handle->accepted_fd);
+ handle->accepted_fd = -1;
+ }
+
+ assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
+}
+
+
+int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
+ assert(0 && "implement me");
+ abort();
+ return 0;
+}