ext/common/BackgroundEventLoop.cpp in passenger-5.0.8 vs ext/common/BackgroundEventLoop.cpp in passenger-5.0.9
- old
+ new
@@ -21,147 +21,329 @@
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <cstdlib>
+#include <cerrno>
+#include <cassert>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
+#include <boost/scoped_ptr.hpp>
#include <oxt/thread.hpp>
-#include <ev++.h>
-#include <eio.h>
+#include <oxt/backtrace.hpp>
+#include <oxt/system_calls.hpp>
+#include <oxt/detail/context.hpp>
+#include <ev.h>
+#include <uv.h>
#include <BackgroundEventLoop.h>
#include <Logging.h>
#include <Exceptions.h>
#include <SafeLibev.h>
+#ifndef HAVE_KQUEUE
+ #if defined(__APPLE__) || \
+ defined(__DragonFly__) || \
+ defined(__FreeBSD__) || \
+ defined(__OpenBSD__) || \
+ defined(__NetBSD__)
+ #define HAVE_KQUEUE 1
+ #endif
+#endif
+
+#ifndef HAVE_EPOLL
+ #ifdef __linux__
+ #define HAVE_EPOLL 1
+ #endif
+#endif
+
+#ifndef HAVE_POLLSET
+ #ifdef __AIX
+ #define HAVE_POLLSET 1
+ #endif
+#endif
+
+#ifndef HAVE_EVENT_PORTS
+ #if defined(sun) || defined(__sun)
+ #define HAVE_EVENT_PORTS
+ #endif
+#endif
+
+#if defined(HAVE_KQUEUE)
+ #include <sys/types.h>
+ #include <sys/event.h>
+ #include <sys/time.h>
+#elif defined(HAVE_EPOLL)
+ #include <sys/epoll.h>
+#elif defined(HAVE_POLLSET)
+ #include <sys/poll.h>
+ #include <sys/pollset.h>
+ #include <sys/fcntl.h>
+#elif defined(HAVE_EVENT_PORTS)
+ #include <port.h>
+#endif
+
+
namespace Passenger {
using namespace std;
using namespace boost;
using namespace oxt;
struct BackgroundEventLoopPrivate {
+ struct ev_async exitSignaller;
+ struct ev_async libuvActivitySignaller;
+ uv_loop_t libuv_loop;
+ /**
+ * Coordinates communication between the libuv poller thread and the
+ * libuv activity callback (the latter which runs on the libevent thread.
+ */
+ uv_sem_t libuv_sem;
+ /**
+ * This timer doesn't do anything. It only exists to prevent
+ * uv_backend_timeout() from returning 0, which would make the
+ * libuv poller thread use 100% CPU.
+ */
+ uv_timer_t libuv_timer;
+
oxt::thread *thr;
- boost::mutex lock;
- boost::condition_variable cond;
- ev_idle eioRepeatWatcher;
- ev_async eioReadyWatcher;
- bool useLibeio;
+ oxt::thread *libuvPollerThr;
+ uv_barrier_t startBarrier;
+
+ bool usesLibuv;
bool started;
};
-static BackgroundEventLoop *eioInstanceData = NULL;
-
static void
-signalBackgroundEventLoopExit(struct ev_loop *loop, ev_async *async, int revents) {
+signalLibevExit(struct ev_loop *loop, ev_async *async, int revents) {
BackgroundEventLoop *bg = (BackgroundEventLoop *) async->data;
- ev_break(bg->loop, EVBREAK_ALL);
+ if (bg->priv->usesLibuv) {
+ ev_async_stop(bg->libev_loop, &bg->priv->libuvActivitySignaller);
+ }
+ ev_async_stop(bg->libev_loop, &bg->priv->exitSignaller);
+ ev_break(bg->libev_loop, EVBREAK_ALL);
+ if (bg->priv->usesLibuv) {
+ uv_timer_stop(&bg->priv->libuv_timer);
+ uv_run(bg->libuv_loop, UV_RUN_NOWAIT);
+ }
}
static void
-startBackgroundLoop(BackgroundEventLoop *bg) {
- boost::unique_lock<boost::mutex> l(bg->priv->lock);
- bg->safe->setCurrentThread();
- bg->priv->started = true;
- bg->priv->cond.notify_all();
- l.unlock();
- ev_run(bg->loop, 0);
+onLibuvActivity(struct ev_loop *loop, ev_async *async, int revents) {
+ BackgroundEventLoop *bg = (BackgroundEventLoop *) async->data;
+ uv_run(bg->libuv_loop, UV_RUN_NOWAIT);
+ uv_sem_post(&bg->priv->libuv_sem);
}
static void
-eioRepeat(EV_P_ ev_idle *w, int revents) {
- if (eio_poll() != -1) {
- ev_idle_stop(EV_A_ w);
- }
+doNothing(uv_timer_t *timer) {
+ // Do nothing
}
-/* eio has some results, process them */
static void
-eioReady(EV_P_ ev_async *w, int revents) {
- if (eio_poll() == -1) {
- ev_idle_start(EV_A_ &eioInstanceData->priv->eioRepeatWatcher);
+runBackgroundLoop(BackgroundEventLoop *bg) {
+ bg->safe->setCurrentThread();
+ if (bg->priv->usesLibuv) {
+ uv_timer_start(&bg->priv->libuv_timer, doNothing, 99999000, 99999000);
+ uv_run(bg->libuv_loop, UV_RUN_NOWAIT);
}
+ uv_barrier_wait(&bg->priv->startBarrier);
+ ev_run(bg->libev_loop, 0);
}
-/* wake up the event loop */
static void
-eioWantPoll(void) {
- ev_async_send(eioInstanceData->loop, &eioInstanceData->priv->eioReadyWatcher);
+pollLibuv(BackgroundEventLoop *bg) {
+ uv_barrier_wait(&bg->priv->startBarrier);
+
+ int ret;
+ int fd;
+ int timeout;
+ int lastErrno;
+ bool intrRequested = false;
+ oxt::thread_local_context *ctx = oxt::get_thread_local_context();
+ assert(ctx != NULL);
+
+ fd = uv_backend_fd(&bg->priv->libuv_loop);
+
+ while (!this_thread::interruption_requested()) {
+ timeout = uv_backend_timeout(&bg->priv->libuv_loop);
+
+ ctx->syscall_interruption_lock.unlock();
+
+ do {
+ #if defined(HAVE_KQUEUE)
+ struct timespec ts;
+ struct kevent event;
+
+ ts.tv_sec = timeout / 1000;
+ ts.tv_nsec = (timeout % 1000) * 1000000;
+
+ ret = kevent(fd, NULL, 0, &event, 1, (timeout == -1) ? NULL : &ts);
+ #elif defined(HAVE_EPOLL)
+ struct epoll_event ev;
+ ret = epoll_wait(fd, &ev, 1, timeout);
+ #elif defined(HAVE_POLLSET)
+ struct pollfd event;
+ ret = pollset_poll(fd, &event, 1, timeout);
+ #elif defined(HAVE_EVENT_PORTS)
+ struct timespec ts;
+ struct port_event event;
+
+ ts.tv_sec = timeout / 1000;
+ ts.tv_nsec = (timeout % 1000) * 1000000;
+
+ ret = port_get(fd, &event, (timeout == -1) ? NULL : &ts);
+ #else
+ #error "This platform is not supported. Please add corresponding I/O polling code."
+ #endif
+
+ lastErrno = errno;
+ } while (ret == -1
+ && lastErrno == EINTR
+ && (!boost::this_thread::syscalls_interruptable()
+ || !(intrRequested = this_thread::interruption_requested())));
+
+ ctx->syscall_interruption_lock.lock();
+
+ if (ret == -1
+ && lastErrno == EINTR
+ && this_thread::syscalls_interruptable()
+ && intrRequested)
+ {
+ throw boost::thread_interrupted();
+ }
+
+ ev_async_send(bg->libev_loop, &bg->priv->libuvActivitySignaller);
+ uv_sem_wait(&bg->priv->libuv_sem);
+ }
}
-BackgroundEventLoop::BackgroundEventLoop(bool scalable, bool useLibeio) {
+BackgroundEventLoop::BackgroundEventLoop(bool scalable, bool usesLibuv)
+ : libev_loop(NULL),
+ libuv_loop(NULL),
+ priv(NULL)
+{
+ struct Guard {
+ BackgroundEventLoop *self;
+
+ Guard(BackgroundEventLoop *_self)
+ : self(_self)
+ { }
+
+ ~Guard() {
+ if (self != NULL) {
+ if (self->libev_loop != NULL) {
+ ev_loop_destroy(self->libev_loop);
+ }
+ if (self->libuv_loop != NULL) {
+ uv_loop_close(self->libuv_loop);
+ }
+ delete self->priv;
+ }
+ }
+
+ void clear() {
+ self = NULL;
+ }
+ };
+
TRACE_POINT();
+ Guard guard(this);
+ priv = new BackgroundEventLoopPrivate();
+
if (scalable) {
- loop = ev_loop_new(EVBACKEND_KQUEUE);
- if (loop == NULL) {
- loop = ev_loop_new(EVBACKEND_EPOLL);
+ libev_loop = ev_loop_new(EVBACKEND_KQUEUE);
+ if (libev_loop == NULL) {
+ libev_loop = ev_loop_new(EVBACKEND_EPOLL);
}
- if (loop == NULL) {
- loop = ev_loop_new(EVFLAG_AUTO);
+ if (libev_loop == NULL) {
+ libev_loop = ev_loop_new(EVFLAG_AUTO);
}
} else {
- loop = ev_loop_new(EVBACKEND_POLL);
+ libev_loop = ev_loop_new(EVBACKEND_POLL);
}
- if (loop == NULL) {
- throw RuntimeException("Cannot create an event loop");
+ if (libev_loop == NULL) {
+ throw RuntimeException("Cannot create a libev event loop");
}
- P_LOG_FILE_DESCRIPTOR_OPEN2(ev_backend_fd(loop), "libev event loop: backend FD");
+ P_LOG_FILE_DESCRIPTOR_OPEN2(ev_backend_fd(libev_loop), "libev event loop: backend FD");
- async = (ev_async *) malloc(sizeof(ev_async));
- async->data = this;
- ev_async_init(async, signalBackgroundEventLoopExit);
- ev_async_start(loop, async);
- P_LOG_FILE_DESCRIPTOR_OPEN2(ev_loop_get_pipe(loop, 0), "libev event loop: async pipe 0");
- P_LOG_FILE_DESCRIPTOR_OPEN2(ev_loop_get_pipe(loop, 1), "libev event loop: async pipe 1");
- safe = boost::make_shared<SafeLibev>(loop);
- priv = new BackgroundEventLoopPrivate();
- priv->thr = NULL;
- priv->useLibeio = useLibeio;
- priv->started = false;
+ ev_async_init(&priv->exitSignaller, signalLibevExit);
+ ev_async_start(libev_loop, &priv->exitSignaller);
+ P_LOG_FILE_DESCRIPTOR_OPEN2(ev_loop_get_pipe(libev_loop, 0), "libev event loop: async pipe 0");
+ P_LOG_FILE_DESCRIPTOR_OPEN2(ev_loop_get_pipe(libev_loop, 1), "libev event loop: async pipe 1");
+ priv->exitSignaller.data = this;
+ safe = boost::make_shared<SafeLibev>(libev_loop);
- if (useLibeio) {
- eioInstanceData = this;
- ev_idle_init(&priv->eioRepeatWatcher, eioRepeat);
- ev_async_init(&priv->eioReadyWatcher, eioReady);
- ev_async_start(loop, &priv->eioReadyWatcher);
- eio_init(eioWantPoll, 0);
+ uv_barrier_init(&priv->startBarrier, usesLibuv ? 3 : 2);
+
+ if (usesLibuv) {
+ ev_async_init(&priv->libuvActivitySignaller, onLibuvActivity);
+ ev_async_start(libev_loop, &priv->libuvActivitySignaller);
+ priv->libuvActivitySignaller.data = this;
+
+ libuv_loop = &priv->libuv_loop;
+ uv_loop_init(&priv->libuv_loop);
+ uv_timer_init(&priv->libuv_loop, &priv->libuv_timer);
+ uv_sem_init(&priv->libuv_sem, 0);
+ P_LOG_FILE_DESCRIPTOR_OPEN2(uv_backend_fd(libuv_loop), "libuv event loop: backend");
+ P_LOG_FILE_DESCRIPTOR_OPEN2(libuv_loop->signal_pipefd[0], "libuv event loop: signal pipe 0");
+ P_LOG_FILE_DESCRIPTOR_OPEN2(libuv_loop->signal_pipefd[1], "libuv event loop: signal pipe 1");
}
+
+ priv->thr = NULL;
+ priv->libuvPollerThr = NULL;
+ priv->usesLibuv = usesLibuv;
+ priv->started = false;
+ guard.clear();
}
BackgroundEventLoop::~BackgroundEventLoop() {
stop();
- if (priv->useLibeio) {
- ev_idle_stop(loop, &priv->eioRepeatWatcher);
- ev_async_stop(loop, &priv->eioReadyWatcher);
- eioInstanceData = NULL;
+ if (priv->usesLibuv) {
+ while (uv_loop_alive(libuv_loop)) {
+ uv_run(libuv_loop, UV_RUN_NOWAIT);
+ syscalls::usleep(10000);
+ }
+ uv_sem_destroy(&priv->libuv_sem);
+ P_LOG_FILE_DESCRIPTOR_CLOSE(uv_backend_fd(libuv_loop));
+ P_LOG_FILE_DESCRIPTOR_CLOSE(libuv_loop->signal_pipefd[0]);
+ P_LOG_FILE_DESCRIPTOR_CLOSE(libuv_loop->signal_pipefd[1]);
+ uv_loop_close(libuv_loop);
}
- ev_async_stop(loop, async);
+ uv_barrier_destroy(&priv->startBarrier);
delete priv;
- free(async);
}
void
BackgroundEventLoop::start(const string &threadName, unsigned int stackSize) {
assert(priv->thr == NULL);
- boost::unique_lock<boost::mutex> l(priv->lock);
priv->thr = new oxt::thread(
- boost::bind(startBackgroundLoop, this),
+ boost::bind(runBackgroundLoop, this),
threadName,
stackSize
);
- while (!priv->started) {
- priv->cond.wait(l);
+ if (priv->usesLibuv) {
+ priv->libuvPollerThr = new oxt::thread(
+ boost::bind(pollLibuv, this),
+ threadName + ": libuv poller",
+ 1024 * 512
+ );
}
+ uv_barrier_wait(&priv->startBarrier);
}
void
BackgroundEventLoop::stop() {
if (priv->thr != NULL) {
- ev_async_send(loop, async);
+ if (priv->usesLibuv) {
+ priv->libuvPollerThr->interrupt_and_join();
+ delete priv->libuvPollerThr;
+ priv->libuvPollerThr = NULL;
+ }
+ ev_async_send(libev_loop, &priv->exitSignaller);
priv->thr->join();
delete priv->thr;
priv->thr = NULL;
}
}