ext/polyphony/libev_agent.c in polyphony-0.43.5 vs ext/polyphony/libev_agent.c in polyphony-0.43.6

- old
+ new

@@ -1,7 +1,8 @@ #include <netdb.h> #include <sys/socket.h> +#include <sys/uio.h> #include "polyphony.h" #include "../libev/ev.h" VALUE cLibevAgent = Qnil; @@ -411,68 +412,134 @@ return io; error: return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); } -VALUE LibevAgent_write(int argc, VALUE *argv, VALUE self) { +VALUE LibevAgent_write(VALUE self, VALUE io, VALUE str) { struct LibevAgent_t *agent; struct libev_io watcher; rb_io_t *fptr; VALUE switchpoint_result = Qnil; - VALUE io; VALUE underlying_io; + char *buf = StringValuePtr(str); + long len = RSTRING_LEN(str); + long left = len; + + underlying_io = rb_iv_get(io, "@io"); + if (underlying_io != Qnil) io = underlying_io; + GetLibevAgent(self, agent); + io = rb_io_get_write_io(io); + GetOpenFile(io, fptr); + watcher.fiber = Qnil; + + while (left > 0) { + ssize_t n = write(fptr->fd, buf, left); + if (n < 0) { + int e = errno; + if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + } + else { + buf += n; + left -= n; + } + } + + if (watcher.fiber == Qnil) { + switchpoint_result = libev_snooze(); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + } + + RB_GC_GUARD(watcher.fiber); + RB_GC_GUARD(switchpoint_result); + + return INT2NUM(len); +error: + return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); +} + +VALUE LibevAgent_writev(VALUE self, VALUE io, int argc, VALUE *argv) { + struct LibevAgent_t *agent; + struct libev_io watcher; + rb_io_t *fptr; + VALUE switchpoint_result = Qnil; + VALUE underlying_io; + long total_length = 0; long total_written = 0; - int arg_idx = 1; + struct iovec *iov = 0; + struct iovec *iov_ptr = 0; + int iov_count = argc; - if (argc < 2) - rb_raise(rb_eRuntimeError, "(wrong number of arguments (expected 2 or more))"); - - io = argv[0]; underlying_io = rb_iv_get(io, "@io"); if (underlying_io != Qnil) io = underlying_io; GetLibevAgent(self, agent); io = rb_io_get_write_io(io); GetOpenFile(io, fptr); watcher.fiber = Qnil; - while (arg_idx < argc) { - VALUE str = argv[arg_idx]; - char *buf = StringValuePtr(str); - long len = RSTRING_LEN(str); - long left = len; + iov = malloc(iov_count * sizeof(struct iovec)); + for (int i = 0; i < argc; i++) { + VALUE str = argv[i]; + iov[i].iov_base = StringValuePtr(str); + iov[i].iov_len = RSTRING_LEN(str); + total_length += iov[i].iov_len; + } + iov_ptr = iov; - while (left > 0) { - ssize_t n = write(fptr->fd, buf, left); - if (n < 0) { - int e = errno; - if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); + while (1) { + ssize_t n = writev(fptr->fd, iov_ptr, iov_count); + if (n < 0) { + int e = errno; + if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e)); - switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE); - if (TEST_EXCEPTION(switchpoint_result)) goto error; + switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE); + if (TEST_EXCEPTION(switchpoint_result)) goto error; + } + else { + total_written += n; + if (total_written == total_length) break; + + while (n > 0) { + if ((size_t) n < iov_ptr[0].iov_len) { + iov_ptr[0].iov_base = (char *) iov_ptr[0].iov_base + n; + iov_ptr[0].iov_len -= n; + n = 0; + } + else { + n -= iov_ptr[0].iov_len; + iov_ptr += 1; + iov_count -= 1; + } } - else { - buf += n; - left -= n; - } } - total_written += len; - arg_idx++; } - if (watcher.fiber == Qnil) { switchpoint_result = libev_snooze(); if (TEST_EXCEPTION(switchpoint_result)) goto error; } RB_GC_GUARD(watcher.fiber); RB_GC_GUARD(switchpoint_result); + free(iov); return INT2NUM(total_written); error: + free(iov); return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result); } +VALUE LibevAgent_write_m(int argc, VALUE *argv, VALUE self) { + if (argc < 2) + // TODO: raise ArgumentError + rb_raise(rb_eRuntimeError, "(wrong number of arguments (expected 2 or more))"); + + return (argc == 2) ? + LibevAgent_write(self, argv[0], argv[1]) : + LibevAgent_writev(self, argv[0], argc - 1, argv + 1); +} + /////////////////////////////////////////////////////////////////////////// VALUE LibevAgent_accept(VALUE self, VALUE sock) { struct LibevAgent_t *agent; struct libev_io watcher; @@ -736,10 +803,10 @@ rb_define_method(cLibevAgent, "poll", LibevAgent_poll, 3); rb_define_method(cLibevAgent, "break", LibevAgent_break, 0); rb_define_method(cLibevAgent, "read", LibevAgent_read, 4); rb_define_method(cLibevAgent, "read_loop", LibevAgent_read_loop, 1); - rb_define_method(cLibevAgent, "write", LibevAgent_write, -1); + rb_define_method(cLibevAgent, "write", LibevAgent_write_m, -1); rb_define_method(cLibevAgent, "accept", LibevAgent_accept, 1); rb_define_method(cLibevAgent, "accept_loop", LibevAgent_accept_loop, 1); // rb_define_method(cLibevAgent, "connect", LibevAgent_accept, 3); rb_define_method(cLibevAgent, "wait_io", LibevAgent_wait_io, 2); rb_define_method(cLibevAgent, "sleep", LibevAgent_sleep, 1);