ext/polyphony/libev_agent.c in polyphony-0.43.1 vs ext/polyphony/libev_agent.c in polyphony-0.43.2
- old
+ new
@@ -1,8 +1,10 @@
+#include <netdb.h>
+#include <sys/socket.h>
+
#include "polyphony.h"
#include "../libev/ev.h"
-#include <sys/socket.h>
VALUE cLibevAgent = Qnil;
VALUE cTCPSocket;
struct LibevAgent_t {
@@ -126,11 +128,11 @@
int is_nowait = nowait == Qtrue;
struct LibevAgent_t *agent;
GetLibevAgent(self, agent);
if (is_nowait) {
- int runnable_count = RARRAY_LEN(queue);
+ long runnable_count = RARRAY_LEN(queue);
agent->run_no_wait_count++;
if (agent->run_no_wait_count < runnable_count || agent->run_no_wait_count < 10)
return self;
}
@@ -232,11 +234,11 @@
struct libev_io {
struct ev_io io;
VALUE fiber;
};
-static void LibevAgent_io_callback(EV_P_ ev_io *w, int revents)
+void LibevAgent_io_callback(EV_P_ ev_io *w, int revents)
{
struct libev_io *watcher = (struct libev_io *)w;
Fiber_make_runnable(watcher->fiber, Qnil);
}
@@ -253,15 +255,35 @@
struct LibevAgent_t *agent;
GetLibevAgent(self, agent);
return libev_await(agent);
}
+VALUE libev_io_wait(struct LibevAgent_t *agent, struct libev_io *watcher, rb_io_t *fptr, int flags) {
+ VALUE switchpoint_result;
+
+ if (watcher->fiber == Qnil) {
+ watcher->fiber = rb_fiber_current();
+ ev_io_init(&watcher->io, LibevAgent_io_callback, fptr->fd, flags);
+ }
+ ev_io_start(agent->ev_loop, &watcher->io);
+ switchpoint_result = libev_await(agent);
+ ev_io_stop(agent->ev_loop, &watcher->io);
+
+ RB_GC_GUARD(switchpoint_result);
+ return switchpoint_result;
+}
+
+VALUE libev_snooze() {
+ Fiber_make_runnable(rb_fiber_current(), Qnil);
+ return Thread_switch_fiber(rb_thread_current());
+}
+
VALUE LibevAgent_read(VALUE self, VALUE io, VALUE str, VALUE length, VALUE to_eof) {
struct LibevAgent_t *agent;
struct libev_io watcher;
rb_io_t *fptr;
- int len = NUM2INT(length);
+ long len = NUM2INT(length);
int shrinkable = io_setstrbuf(&str, len);
char *buf = RSTRING_PTR(str);
long total = 0;
VALUE switchpoint_result = Qnil;
int read_to_eof = RTEST(to_eof);
@@ -275,47 +297,31 @@
watcher.fiber = Qnil;
OBJ_TAINT(str);
while (len > 0) {
- int n = read(fptr->fd, buf, len);
- if (n == 0)
- break;
- if (n > 0) {
+ ssize_t n = read(fptr->fd, buf, len);
+ if (n < 0) {
+ int e = errno;
+ if (e != EWOULDBLOCK && e != EAGAIN) rb_syserr_fail(e, strerror(e));
+
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+ }
+ else {
+ switchpoint_result = libev_snooze();
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+
+ if (n == 0) break; // EOF
+
total = total + n;
buf += n;
len -= n;
if (!read_to_eof || (len == 0)) break;
}
- else {
- int e = errno;
- if ((e == EWOULDBLOCK || e == EAGAIN)) {
- if (watcher.fiber == Qnil) {
- watcher.fiber = rb_fiber_current();
- ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ);
- }
- ev_io_start(agent->ev_loop, &watcher.io);
- switchpoint_result = libev_await(agent);
- ev_io_stop(agent->ev_loop, &watcher.io);
- if (TEST_EXCEPTION(switchpoint_result)) {
- goto error;
- }
- }
- else
- rb_syserr_fail(e, strerror(e));
- // rb_syserr_fail_path(e, fptr->pathv);
- }
}
- if (watcher.fiber == Qnil) {
- Fiber_make_runnable(rb_fiber_current(), Qnil);
- switchpoint_result = Thread_switch_fiber(rb_thread_current());
- if (TEST_EXCEPTION(switchpoint_result)) {
- goto error;
- }
- }
-
if (total == 0) return Qnil;
io_set_read_length(str, total, shrinkable);
io_enc_str(str, fptr);
@@ -345,12 +351,12 @@
struct LibevAgent_t *agent;
struct libev_io watcher;
rb_io_t *fptr;
VALUE str;
- int total;
- int len = 8192;
+ long total;
+ long len = 8192;
int shrinkable;
char *buf;
VALUE switchpoint_result = Qnil;
VALUE underlying_io = rb_iv_get(io, "@io");
@@ -364,40 +370,32 @@
watcher.fiber = Qnil;
OBJ_TAINT(str);
while (1) {
- int n = read(fptr->fd, buf, len);
- if (n == 0)
- break;
- if (n > 0) {
+ ssize_t n = read(fptr->fd, buf, len);
+ if (n < 0) {
+ int e = errno;
+ if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
+
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+ }
+ else {
+ switchpoint_result = libev_snooze();
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+
+ if (n == 0) break; // EOF
+
total = n;
YIELD_STR();
Fiber_make_runnable(rb_fiber_current(), Qnil);
switchpoint_result = Thread_switch_fiber(rb_thread_current());
if (TEST_EXCEPTION(switchpoint_result)) {
goto error;
}
}
- else {
- int e = errno;
- if ((e == EWOULDBLOCK || e == EAGAIN)) {
- if (watcher.fiber == Qnil) {
- watcher.fiber = rb_fiber_current();
- ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ);
- }
- ev_io_start(agent->ev_loop, &watcher.io);
- switchpoint_result = libev_await(agent);
- ev_io_stop(agent->ev_loop, &watcher.io);
- if (TEST_EXCEPTION(switchpoint_result)) {
- goto error;
- }
- }
- else
- rb_syserr_fail(e, strerror(e));
- // rb_syserr_fail_path(e, fptr->pathv);
- }
}
RB_GC_GUARD(str);
RB_GC_GUARD(watcher.fiber);
RB_GC_GUARD(switchpoint_result);
@@ -412,52 +410,35 @@
struct libev_io watcher;
rb_io_t *fptr;
VALUE switchpoint_result = Qnil;
char *buf = StringValuePtr(str);
- int len = RSTRING_LEN(str);
- int left = len;
+ long len = RSTRING_LEN(str);
+ long left = len;
VALUE underlying_io = rb_iv_get(io, "@io");
if (underlying_io != Qnil) io = underlying_io;
GetLibevAgent(self, agent);
io = rb_io_get_write_io(io);
GetOpenFile(io, fptr);
watcher.fiber = Qnil;
while (left > 0) {
- int result = write(fptr->fd, buf, left);
- if (result < 0) {
+ ssize_t n = write(fptr->fd, buf, left);
+ if (n < 0) {
int e = errno;
- if (e == EAGAIN) {
- if (watcher.fiber == Qnil) {
- watcher.fiber = rb_fiber_current();
- ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_WRITE);
- }
- ev_io_start(agent->ev_loop, &watcher.io);
- switchpoint_result = libev_await(agent);
- ev_io_stop(agent->ev_loop, &watcher.io);
- if (TEST_EXCEPTION(switchpoint_result))
- goto error;
- }
- else {
- rb_syserr_fail(e, strerror(e));
- // rb_syserr_fail_path(e, fptr->pathv);
-
- }
+ if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
+
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
}
else {
- buf += result;
- left -= result;
- }
- }
+ switchpoint_result = libev_snooze();
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
- if (watcher.fiber == Qnil) {
- Fiber_make_runnable(rb_fiber_current(), Qnil);
- switchpoint_result = Thread_switch_fiber(rb_thread_current());
- if (TEST_EXCEPTION(switchpoint_result)) {
- goto error;
+ buf += n;
+ left -= n;
}
}
RB_GC_GUARD(watcher.fiber);
RB_GC_GUARD(switchpoint_result);
@@ -486,65 +467,54 @@
watcher.fiber = Qnil;
while (1) {
fd = accept(fptr->fd, &addr, &len);
if (fd < 0) {
int e = errno;
- if (e == EWOULDBLOCK || e == EAGAIN) {
- if (watcher.fiber == Qnil) {
- watcher.fiber = rb_fiber_current();
- ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ);
- }
- ev_io_start(agent->ev_loop, &watcher.io);
- switchpoint_result = libev_await(agent);
- ev_io_stop(agent->ev_loop, &watcher.io);
+ if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
- TEST_RESUME_EXCEPTION(switchpoint_result);
- RB_GC_GUARD(watcher.fiber);
- RB_GC_GUARD(switchpoint_result);
- }
- else
- rb_syserr_fail(e, strerror(e));
- // rb_syserr_fail_path(e, fptr->pathv);
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
}
else {
- VALUE connection = rb_obj_alloc(cTCPSocket);
+ VALUE socket;
rb_io_t *fp;
- MakeOpenFile(connection, fp);
+ switchpoint_result = libev_snooze();
+ if (TEST_EXCEPTION(switchpoint_result)) {
+ close(fd); // close fd since we're raising an exception
+ goto error;
+ }
+
+ socket = rb_obj_alloc(cTCPSocket);
+ MakeOpenFile(socket, fp);
rb_update_max_fd(fd);
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
- rb_io_ascii8bit_binmode(connection);
+ rb_io_ascii8bit_binmode(socket);
rb_io_set_nonblock(fp);
rb_io_synchronized(fp);
// if (rsock_do_not_reverse_lookup) {
// fp->mode |= FMODE_NOREVLOOKUP;
// }
-
- if (watcher.fiber == Qnil) {
- Fiber_make_runnable(rb_fiber_current(), Qnil);
- switchpoint_result = Thread_switch_fiber(rb_thread_current());
- if (TEST_EXCEPTION(switchpoint_result)) {
- return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
- }
- }
-
- return connection;
+ return socket;
}
}
+ RB_GC_GUARD(switchpoint_result);
return Qnil;
+error:
+ return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
}
VALUE LibevAgent_accept_loop(VALUE self, VALUE sock) {
struct LibevAgent_t *agent;
struct libev_io watcher;
rb_io_t *fptr;
int fd;
struct sockaddr addr;
socklen_t len = (socklen_t)sizeof addr;
VALUE switchpoint_result = Qnil;
- VALUE connection = Qnil;
+ VALUE socket = Qnil;
VALUE underlying_sock = rb_iv_get(sock, "@io");
if (underlying_sock != Qnil) sock = underlying_sock;
GetLibevAgent(self, agent);
GetOpenFile(sock, fptr);
@@ -553,50 +523,86 @@
while (1) {
fd = accept(fptr->fd, &addr, &len);
if (fd < 0) {
int e = errno;
- if (e == EWOULDBLOCK || e == EAGAIN) {
- if (watcher.fiber == Qnil) {
- watcher.fiber = rb_fiber_current();
- ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ);
- }
- ev_io_start(agent->ev_loop, &watcher.io);
- switchpoint_result = libev_await(agent);
- ev_io_stop(agent->ev_loop, &watcher.io);
+ if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
- TEST_RESUME_EXCEPTION(switchpoint_result);
- }
- else
- rb_syserr_fail(e, strerror(e));
- // rb_syserr_fail_path(e, fptr->pathv);
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
}
else {
rb_io_t *fp;
- connection = rb_obj_alloc(cTCPSocket);
- MakeOpenFile(connection, fp);
+ switchpoint_result = libev_snooze();
+ if (TEST_EXCEPTION(switchpoint_result)) {
+ close(fd); // close fd since we're raising an exception
+ goto error;
+ }
+
+ socket = rb_obj_alloc(cTCPSocket);
+ MakeOpenFile(socket, fp);
rb_update_max_fd(fd);
fp->fd = fd;
fp->mode = FMODE_READWRITE | FMODE_DUPLEX;
- rb_io_ascii8bit_binmode(connection);
+ rb_io_ascii8bit_binmode(socket);
rb_io_set_nonblock(fp);
rb_io_synchronized(fp);
- rb_yield(connection);
- connection = Qnil;
-
- Fiber_make_runnable(rb_fiber_current(), Qnil);
- switchpoint_result = Thread_switch_fiber(rb_thread_current());
- TEST_RESUME_EXCEPTION(switchpoint_result);
+ rb_yield(socket);
+ socket = Qnil;
}
}
- RB_GC_GUARD(connection);
+ RB_GC_GUARD(socket);
RB_GC_GUARD(watcher.fiber);
RB_GC_GUARD(switchpoint_result);
+ return Qnil;
+error:
+ return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
}
+// VALUE LibevAgent_connect(VALUE self, VALUE sock, VALUE host, VALUE port) {
+// struct LibevAgent_t *agent;
+// struct libev_io watcher;
+// rb_io_t *fptr;
+// struct sockaddr_in addr;
+// char *host_buf = StringValueCStr(host);
+// VALUE switchpoint_result = Qnil;
+// VALUE underlying_sock = rb_iv_get(sock, "@io");
+// if (underlying_sock != Qnil) sock = underlying_sock;
+
+// GetLibevAgent(self, agent);
+// GetOpenFile(sock, fptr);
+// rb_io_set_nonblock(fptr);
+// watcher.fiber = Qnil;
+
+// addr.sin_family = AF_INET;
+// addr.sin_addr.s_addr = inet_addr(host_buf);
+// addr.sin_port = htons(NUM2INT(port));
+
+// while (1) {
+// int result = connect(fptr->fd, &addr, sizeof(addr));
+// if (result < 0) {
+// int e = errno;
+// if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
+
+// switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE);
+// if (TEST_EXCEPTION(switchpoint_result)) goto error;
+// }
+// else {
+// switchpoint_result = libev_snooze();
+// if (TEST_EXCEPTION(switchpoint_result)) goto error;
+
+// return sock;
+// }
+// }
+// RB_GC_GUARD(switchpoint_result);
+// return Qnil;
+// error:
+// return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
+// }
+
VALUE LibevAgent_wait_io(VALUE self, VALUE io, VALUE write) {
struct LibevAgent_t *agent;
struct libev_io watcher;
rb_io_t *fptr;
VALUE switchpoint_result = Qnil;
@@ -622,11 +628,11 @@
struct libev_timer {
struct ev_timer timer;
VALUE fiber;
};
-static void LibevAgent_timer_callback(EV_P_ ev_timer *w, int revents)
+void LibevAgent_timer_callback(EV_P_ ev_timer *w, int revents)
{
struct libev_timer *watcher = (struct libev_timer *)w;
Fiber_make_runnable(watcher->fiber, Qnil);
}
@@ -652,11 +658,11 @@
struct libev_child {
struct ev_child child;
VALUE fiber;
};
-static void LibevAgent_child_callback(EV_P_ ev_child *w, int revents)
+void LibevAgent_child_callback(EV_P_ ev_child *w, int revents)
{
struct libev_child *watcher = (struct libev_child *)w;
int exit_status = w->rstatus >> 8; // weird, why should we do this?
VALUE status;
@@ -710,9 +716,10 @@
rb_define_method(cLibevAgent, "read", LibevAgent_read, 4);
rb_define_method(cLibevAgent, "read_loop", LibevAgent_read_loop, 1);
rb_define_method(cLibevAgent, "write", LibevAgent_write, 2);
rb_define_method(cLibevAgent, "accept", LibevAgent_accept, 1);
rb_define_method(cLibevAgent, "accept_loop", LibevAgent_accept_loop, 1);
+ // rb_define_method(cLibevAgent, "connect", LibevAgent_accept, 3);
rb_define_method(cLibevAgent, "wait_io", LibevAgent_wait_io, 2);
rb_define_method(cLibevAgent, "sleep", LibevAgent_sleep, 1);
rb_define_method(cLibevAgent, "waitpid", LibevAgent_waitpid, 1);
}