ext/polyphony/libev_agent.c in polyphony-0.43.1 vs ext/polyphony/libev_agent.c in polyphony-0.43.2

- old
+ new

@@ -1,8 +1,10 @@ +#include <netdb.h> +#include <sys/socket.h> + #include "polyphony.h" #include "../libev/ev.h" -#include <sys/socket.h> VALUE cLibevAgent = Qnil; VALUE cTCPSocket; struct LibevAgent_t { @@ -126,11 +128,11 @@ int is_nowait = nowait == Qtrue; struct LibevAgent_t *agent; GetLibevAgent(self, agent); if (is_nowait) { - int runnable_count = RARRAY_LEN(queue); + long runnable_count = RARRAY_LEN(queue); agent->run_no_wait_count++; if (agent->run_no_wait_count < runnable_count || agent->run_no_wait_count < 10) return self; } @@ -232,11 +234,11 @@ struct libev_io { struct ev_io io; VALUE fiber; }; -static void LibevAgent_io_callback(EV_P_ ev_io *w, int revents) +void LibevAgent_io_callback(EV_P_ ev_io *w, int revents) { struct libev_io *watcher = (struct libev_io *)w; Fiber_make_runnable(watcher->fiber, Qnil); } @@ -253,15 +255,35 @@ struct LibevAgent_t *agent; GetLibevAgent(self, agent); return libev_await(agent); } +VALUE libev_io_wait(struct LibevAgent_t *agent, struct libev_io *watcher, rb_io_t *fptr, int flags) { + VALUE switchpoint_result; + + if (watcher->fiber == Qnil) { + watcher->fiber = rb_fiber_current(); + ev_io_init(&watcher->io, LibevAgent_io_callback, fptr->fd, flags); + } + ev_io_start(agent->ev_loop, &watcher->io); + switchpoint_result = libev_await(agent); + ev_io_stop(agent->ev_loop, &watcher->io); + + RB_GC_GUARD(switchpoint_result); + return switchpoint_result; +} + +VALUE libev_snooze() { + Fiber_make_runnable(rb_fiber_current(), Qnil); + return Thread_switch_fiber(rb_thread_current()); +} + VALUE LibevAgent_read(VALUE self, VALUE io, VALUE str, VALUE length, VALUE to_eof) { struct LibevAgent_t *agent; struct libev_io watcher; rb_io_t *fptr; - int len = NUM2INT(length); + long len = NUM2INT(length); int shrinkable = io_setstrbuf(&str, len); char *buf = RSTRING_PTR(str); long total = 0; VALUE switchpoint_result = Qnil; int read_to_eof = RTEST(to_eof); @@ -275,47 +297,31 @@ watcher.fiber = Qnil; OBJ_TAINT(str); while (len > 0) { - int n = read(fptr->fd, buf, len); - if (n == 0) - break; - if (n > 0) { + ssize_t n = read(fptr->fd, buf, len); + if (n < 0) { + int e = errno; + if (e != EWOULDBLOCK && e != EAGAIN) rb_syserr_fail(e, strerror(e)); + + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + } + else { + switchpoint_result = libev_snooze(); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + + if (n == 0) break; // EOF + total = total + n; buf += n; len -= n; if (!read_to_eof || (len == 0)) break; } - else { - int e = errno; - if ((e == EWOULDBLOCK || e == EAGAIN)) { - if (watcher.fiber == Qnil) { - watcher.fiber = rb_fiber_current(); - ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ); - } - ev_io_start(agent->ev_loop, &watcher.io); - switchpoint_result = libev_await(agent); - ev_io_stop(agent->ev_loop, &watcher.io); - if (TEST_EXCEPTION(switchpoint_result)) { - goto error; - } - } - else - rb_syserr_fail(e, strerror(e)); - // rb_syserr_fail_path(e, fptr->pathv); - } } - if (watcher.fiber == Qnil) { - Fiber_make_runnable(rb_fiber_current(), Qnil); - switchpoint_result = Thread_switch_fiber(rb_thread_current()); - if (TEST_EXCEPTION(switchpoint_result)) { - goto error; - } - } - if (total == 0) return Qnil; io_set_read_length(str, total, shrinkable); io_enc_str(str, fptr); @@ -345,12 +351,12 @@ struct LibevAgent_t *agent; struct libev_io watcher; rb_io_t *fptr; VALUE str; - int total; - int len = 8192; + long total; + long len = 8192; int shrinkable; char *buf; VALUE switchpoint_result = Qnil; VALUE underlying_io = rb_iv_get(io, "@io"); @@ -364,40 +370,32 @@ watcher.fiber = Qnil; OBJ_TAINT(str); while (1) { - int n = read(fptr->fd, buf, len); - if (n == 0) - break; - if (n > 0) { + ssize_t n = read(fptr->fd, buf, len); + if (n < 0) { + int e = errno; + if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); + + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + } + else { + switchpoint_result = libev_snooze(); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + + if (n == 0) break; // EOF + total = n; YIELD_STR(); Fiber_make_runnable(rb_fiber_current(), Qnil); switchpoint_result = Thread_switch_fiber(rb_thread_current()); if (TEST_EXCEPTION(switchpoint_result)) { goto error; } } - else { - int e = errno; - if ((e == EWOULDBLOCK || e == EAGAIN)) { - if (watcher.fiber == Qnil) { - watcher.fiber = rb_fiber_current(); - ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ); - } - ev_io_start(agent->ev_loop, &watcher.io); - switchpoint_result = libev_await(agent); - ev_io_stop(agent->ev_loop, &watcher.io); - if (TEST_EXCEPTION(switchpoint_result)) { - goto error; - } - } - else - rb_syserr_fail(e, strerror(e)); - // rb_syserr_fail_path(e, fptr->pathv); - } } RB_GC_GUARD(str); RB_GC_GUARD(watcher.fiber); RB_GC_GUARD(switchpoint_result); @@ -412,52 +410,35 @@ struct libev_io watcher; rb_io_t *fptr; VALUE switchpoint_result = Qnil; char *buf = StringValuePtr(str); - int len = RSTRING_LEN(str); - int left = len; + long len = RSTRING_LEN(str); + long left = len; VALUE underlying_io = rb_iv_get(io, "@io"); if (underlying_io != Qnil) io = underlying_io; GetLibevAgent(self, agent); io = rb_io_get_write_io(io); GetOpenFile(io, fptr); watcher.fiber = Qnil; while (left > 0) { - int result = write(fptr->fd, buf, left); - if (result < 0) { + ssize_t n = write(fptr->fd, buf, left); + if (n < 0) { int e = errno; - if (e == EAGAIN) { - if (watcher.fiber == Qnil) { - watcher.fiber = rb_fiber_current(); - ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_WRITE); - } - ev_io_start(agent->ev_loop, &watcher.io); - switchpoint_result = libev_await(agent); - ev_io_stop(agent->ev_loop, &watcher.io); - if (TEST_EXCEPTION(switchpoint_result)) - goto error; - } - else { - rb_syserr_fail(e, strerror(e)); - // rb_syserr_fail_path(e, fptr->pathv); - - } + if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); + + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE); + if (TEST_EXCEPTION(switchpoint_result)) goto error; } else { - buf += result; - left -= result; - } - } + switchpoint_result = libev_snooze(); + if (TEST_EXCEPTION(switchpoint_result)) goto error; - if (watcher.fiber == Qnil) { - Fiber_make_runnable(rb_fiber_current(), Qnil); - switchpoint_result = Thread_switch_fiber(rb_thread_current()); - if (TEST_EXCEPTION(switchpoint_result)) { - goto error; + buf += n; + left -= n; } } RB_GC_GUARD(watcher.fiber); RB_GC_GUARD(switchpoint_result); @@ -486,65 +467,54 @@ watcher.fiber = Qnil; while (1) { fd = accept(fptr->fd, &addr, &len); if (fd < 0) { int e = errno; - if (e == EWOULDBLOCK || e == EAGAIN) { - if (watcher.fiber == Qnil) { - watcher.fiber = rb_fiber_current(); - ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ); - } - ev_io_start(agent->ev_loop, &watcher.io); - switchpoint_result = libev_await(agent); - ev_io_stop(agent->ev_loop, &watcher.io); + if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); - TEST_RESUME_EXCEPTION(switchpoint_result); - RB_GC_GUARD(watcher.fiber); - RB_GC_GUARD(switchpoint_result); - } - else - rb_syserr_fail(e, strerror(e)); - // rb_syserr_fail_path(e, fptr->pathv); + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ); + if (TEST_EXCEPTION(switchpoint_result)) goto error; } else { - VALUE connection = rb_obj_alloc(cTCPSocket); + VALUE socket; rb_io_t *fp; - MakeOpenFile(connection, fp); + switchpoint_result = libev_snooze(); + if (TEST_EXCEPTION(switchpoint_result)) { + close(fd); // close fd since we're raising an exception + goto error; + } + + socket = rb_obj_alloc(cTCPSocket); + MakeOpenFile(socket, fp); rb_update_max_fd(fd); fp->fd = fd; fp->mode = FMODE_READWRITE | FMODE_DUPLEX; - rb_io_ascii8bit_binmode(connection); + rb_io_ascii8bit_binmode(socket); rb_io_set_nonblock(fp); rb_io_synchronized(fp); // if (rsock_do_not_reverse_lookup) { // fp->mode |= FMODE_NOREVLOOKUP; // } - - if (watcher.fiber == Qnil) { - Fiber_make_runnable(rb_fiber_current(), Qnil); - switchpoint_result = Thread_switch_fiber(rb_thread_current()); - if (TEST_EXCEPTION(switchpoint_result)) { - return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); - } - } - - return connection; + return socket; } } + RB_GC_GUARD(switchpoint_result); return Qnil; +error: + return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); } VALUE LibevAgent_accept_loop(VALUE self, VALUE sock) { struct LibevAgent_t *agent; struct libev_io watcher; rb_io_t *fptr; int fd; struct sockaddr addr; socklen_t len = (socklen_t)sizeof addr; VALUE switchpoint_result = Qnil; - VALUE connection = Qnil; + VALUE socket = Qnil; VALUE underlying_sock = rb_iv_get(sock, "@io"); if (underlying_sock != Qnil) sock = underlying_sock; GetLibevAgent(self, agent); GetOpenFile(sock, fptr); @@ -553,50 +523,86 @@ while (1) { fd = accept(fptr->fd, &addr, &len); if (fd < 0) { int e = errno; - if (e == EWOULDBLOCK || e == EAGAIN) { - if (watcher.fiber == Qnil) { - watcher.fiber = rb_fiber_current(); - ev_io_init(&watcher.io, LibevAgent_io_callback, fptr->fd, EV_READ); - } - ev_io_start(agent->ev_loop, &watcher.io); - switchpoint_result = libev_await(agent); - ev_io_stop(agent->ev_loop, &watcher.io); + if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); - TEST_RESUME_EXCEPTION(switchpoint_result); - } - else - rb_syserr_fail(e, strerror(e)); - // rb_syserr_fail_path(e, fptr->pathv); + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_READ); + if (TEST_EXCEPTION(switchpoint_result)) goto error; } else { rb_io_t *fp; - connection = rb_obj_alloc(cTCPSocket); - MakeOpenFile(connection, fp); + switchpoint_result = libev_snooze(); + if (TEST_EXCEPTION(switchpoint_result)) { + close(fd); // close fd since we're raising an exception + goto error; + } + + socket = rb_obj_alloc(cTCPSocket); + MakeOpenFile(socket, fp); rb_update_max_fd(fd); fp->fd = fd; fp->mode = FMODE_READWRITE | FMODE_DUPLEX; - rb_io_ascii8bit_binmode(connection); + rb_io_ascii8bit_binmode(socket); rb_io_set_nonblock(fp); rb_io_synchronized(fp); - rb_yield(connection); - connection = Qnil; - - Fiber_make_runnable(rb_fiber_current(), Qnil); - switchpoint_result = Thread_switch_fiber(rb_thread_current()); - TEST_RESUME_EXCEPTION(switchpoint_result); + rb_yield(socket); + socket = Qnil; } } - RB_GC_GUARD(connection); + RB_GC_GUARD(socket); RB_GC_GUARD(watcher.fiber); RB_GC_GUARD(switchpoint_result); + return Qnil; +error: + return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); } +// VALUE LibevAgent_connect(VALUE self, VALUE sock, VALUE host, VALUE port) { +// struct LibevAgent_t *agent; +// struct libev_io watcher; +// rb_io_t *fptr; +// struct sockaddr_in addr; +// char *host_buf = StringValueCStr(host); +// VALUE switchpoint_result = Qnil; +// VALUE underlying_sock = rb_iv_get(sock, "@io"); +// if (underlying_sock != Qnil) sock = underlying_sock; + +// GetLibevAgent(self, agent); +// GetOpenFile(sock, fptr); +// rb_io_set_nonblock(fptr); +// watcher.fiber = Qnil; + +// addr.sin_family = AF_INET; +// addr.sin_addr.s_addr = inet_addr(host_buf); +// addr.sin_port = htons(NUM2INT(port)); + +// while (1) { +// int result = connect(fptr->fd, &addr, sizeof(addr)); +// if (result < 0) { +// int e = errno; +// if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); + +// switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE); +// if (TEST_EXCEPTION(switchpoint_result)) goto error; +// } +// else { +// switchpoint_result = libev_snooze(); +// if (TEST_EXCEPTION(switchpoint_result)) goto error; + +// return sock; +// } +// } +// RB_GC_GUARD(switchpoint_result); +// return Qnil; +// error: +// return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); +// } + VALUE LibevAgent_wait_io(VALUE self, VALUE io, VALUE write) { struct LibevAgent_t *agent; struct libev_io watcher; rb_io_t *fptr; VALUE switchpoint_result = Qnil; @@ -622,11 +628,11 @@ struct libev_timer { struct ev_timer timer; VALUE fiber; }; -static void LibevAgent_timer_callback(EV_P_ ev_timer *w, int revents) +void LibevAgent_timer_callback(EV_P_ ev_timer *w, int revents) { struct libev_timer *watcher = (struct libev_timer *)w; Fiber_make_runnable(watcher->fiber, Qnil); } @@ -652,11 +658,11 @@ struct libev_child { struct ev_child child; VALUE fiber; }; -static void LibevAgent_child_callback(EV_P_ ev_child *w, int revents) +void LibevAgent_child_callback(EV_P_ ev_child *w, int revents) { struct libev_child *watcher = (struct libev_child *)w; int exit_status = w->rstatus >> 8; // weird, why should we do this? VALUE status; @@ -710,9 +716,10 @@ rb_define_method(cLibevAgent, "read", LibevAgent_read, 4); rb_define_method(cLibevAgent, "read_loop", LibevAgent_read_loop, 1); rb_define_method(cLibevAgent, "write", LibevAgent_write, 2); rb_define_method(cLibevAgent, "accept", LibevAgent_accept, 1); rb_define_method(cLibevAgent, "accept_loop", LibevAgent_accept_loop, 1); + // rb_define_method(cLibevAgent, "connect", LibevAgent_accept, 3); rb_define_method(cLibevAgent, "wait_io", LibevAgent_wait_io, 2); rb_define_method(cLibevAgent, "sleep", LibevAgent_sleep, 1); rb_define_method(cLibevAgent, "waitpid", LibevAgent_waitpid, 1); }