Sha256: 6f0cdd909c32ed14c122a85f4e11fa5bcb1db298a8a8b136e2ef456d62ddfdad

Contents?: true

Size: 1.98 KB

Versions: 3

Compression:

Stored size: 1.98 KB

Contents

/* kqueue event adapter */

#pragma once

#include <sys/types.h>
#include <sys/time.h>
#ifdef HAVE_SYS_EVENT_H
# include <sys/event.h>
#else
# include <sys/queue.h>
#endif

#define MAX_E 1024
static int qfd;
static struct kevent qevents[MAX_E];

static void ADD_E(int fd, uint64_t etype) {
  struct kevent e;
  // without EV_CLEAR, it is level-triggered
  // http://www.cs.helsinki.fi/linux/linux-kernel/2001-38/0547.html
  EV_SET(&e, fd, EVFILT_READ | EVFILT_WRITE, EV_ADD, 0, 0, (void*)etype);
# ifdef NDEBUG
  kevent(qfd, &e, 1, NULL, 0, NULL);
# else
  if (kevent(qfd, &e, 1, NULL, 0, NULL))
    printf("%s: %s\n", __func__, strerror(errno));
# endif
}

static void DEL_E_WITH_FILTER(int fd, int filter) {
  struct kevent e;
  EV_SET(&e, fd, filter, EV_DELETE, 0, 0, NULL);
# ifdef NDEBUG
  kevent(qfd, &e, 1, NULL, 0, NULL);
# else
  if (kevent(qfd, &e, 1, NULL, 0, NULL))
    printf("%s: %s\n", __func__, strerror(errno));
# endif
}

static void DEL_E(int fd) {
  DEL_E_WITH_FILTER(fd, EVFILT_READ | EVFILT_WRITE);
}

static void INIT_E() {
  qfd = kqueue();
  if (qfd == -1) {
    rb_sys_fail("kqueue(2)");
  }
}

static void LOOP_E() {
  // printf("%d,%d,%d,\n%d,%d,%d,\n%d,%d,%d,\n",
  // EV_ADD, EV_ENABLE, EV_DISABLE,
  // EV_DELETE, EV_RECEIPT, EV_ONESHOT,
  // EV_CLEAR, EV_EOF, EV_ERROR);

  struct timespec ts = {0, 1000 * 1000 * 100};
  while (1) {
    // heart beat of 0.1 sec, allow ruby signal interrupts to be inserted
    int sz = kevent(qfd, NULL, 0, qevents, MAX_E, &ts);

    for (int i = 0; i < sz; i++) {
      int fd = (int)qevents[i].ident;
      if (qevents[i].flags & EV_EOF) {
        // EV_EOF is set if the read side of the socket is shutdown
        // the event can keep flipping back to consume cpu if we don't remove it
        DEL_E_WITH_FILTER(fd, qevents[i].filter);
      }
      if (qevents[i].filter & (EVFILT_READ | EVFILT_WRITE)) {
        loop_body(fd, (int)qevents[i].udata);
        break;
      }
    }
    // execute other thread / interrupts
    rb_thread_schedule();
  }
}

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
nyara-0.0.1.pre.6 ext/inc/kqueue.h
nyara-0.0.1.pre.5 ext/inc/kqueue.h
nyara-0.0.1.pre.4 ext/inc/kqueue.h