ext/polyphony/libev_agent.c in polyphony-0.43.5 vs ext/polyphony/libev_agent.c in polyphony-0.43.6
- old
+ new
@@ -1,7 +1,8 @@
#include <netdb.h>
#include <sys/socket.h>
+#include <sys/uio.h>
#include "polyphony.h"
#include "../libev/ev.h"
VALUE cLibevAgent = Qnil;
@@ -411,68 +412,134 @@
return io;
error:
return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
}
-VALUE LibevAgent_write(int argc, VALUE *argv, VALUE self) {
+VALUE LibevAgent_write(VALUE self, VALUE io, VALUE str) {
struct LibevAgent_t *agent;
struct libev_io watcher;
rb_io_t *fptr;
VALUE switchpoint_result = Qnil;
- VALUE io;
VALUE underlying_io;
+ char *buf = StringValuePtr(str);
+ long len = RSTRING_LEN(str);
+ long left = len;
+
+ underlying_io = rb_iv_get(io, "@io");
+ if (underlying_io != Qnil) io = underlying_io;
+ GetLibevAgent(self, agent);
+ io = rb_io_get_write_io(io);
+ GetOpenFile(io, fptr);
+ watcher.fiber = Qnil;
+
+ while (left > 0) {
+ ssize_t n = write(fptr->fd, buf, left);
+ if (n < 0) {
+ int e = errno;
+ if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+ }
+ else {
+ buf += n;
+ left -= n;
+ }
+ }
+
+ if (watcher.fiber == Qnil) {
+ switchpoint_result = libev_snooze();
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+ }
+
+ RB_GC_GUARD(watcher.fiber);
+ RB_GC_GUARD(switchpoint_result);
+
+ return INT2NUM(len);
+error:
+ return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
+}
+
+VALUE LibevAgent_writev(VALUE self, VALUE io, int argc, VALUE *argv) {
+ struct LibevAgent_t *agent;
+ struct libev_io watcher;
+ rb_io_t *fptr;
+ VALUE switchpoint_result = Qnil;
+ VALUE underlying_io;
+ long total_length = 0;
long total_written = 0;
- int arg_idx = 1;
+ struct iovec *iov = 0;
+ struct iovec *iov_ptr = 0;
+ int iov_count = argc;
- if (argc < 2)
- rb_raise(rb_eRuntimeError, "(wrong number of arguments (expected 2 or more))");
-
- io = argv[0];
underlying_io = rb_iv_get(io, "@io");
if (underlying_io != Qnil) io = underlying_io;
GetLibevAgent(self, agent);
io = rb_io_get_write_io(io);
GetOpenFile(io, fptr);
watcher.fiber = Qnil;
- while (arg_idx < argc) {
- VALUE str = argv[arg_idx];
- char *buf = StringValuePtr(str);
- long len = RSTRING_LEN(str);
- long left = len;
+ iov = malloc(iov_count * sizeof(struct iovec));
+ for (int i = 0; i < argc; i++) {
+ VALUE str = argv[i];
+ iov[i].iov_base = StringValuePtr(str);
+ iov[i].iov_len = RSTRING_LEN(str);
+ total_length += iov[i].iov_len;
+ }
+ iov_ptr = iov;
- while (left > 0) {
- ssize_t n = write(fptr->fd, buf, left);
- if (n < 0) {
- int e = errno;
- if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
+ while (1) {
+ ssize_t n = writev(fptr->fd, iov_ptr, iov_count);
+ if (n < 0) {
+ int e = errno;
+ if ((e != EWOULDBLOCK && e != EAGAIN)) rb_syserr_fail(e, strerror(e));
- switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE);
- if (TEST_EXCEPTION(switchpoint_result)) goto error;
+ switchpoint_result = libev_io_wait(agent, &watcher, fptr, EV_WRITE);
+ if (TEST_EXCEPTION(switchpoint_result)) goto error;
+ }
+ else {
+ total_written += n;
+ if (total_written == total_length) break;
+
+ while (n > 0) {
+ if ((size_t) n < iov_ptr[0].iov_len) {
+ iov_ptr[0].iov_base = (char *) iov_ptr[0].iov_base + n;
+ iov_ptr[0].iov_len -= n;
+ n = 0;
+ }
+ else {
+ n -= iov_ptr[0].iov_len;
+ iov_ptr += 1;
+ iov_count -= 1;
+ }
}
- else {
- buf += n;
- left -= n;
- }
}
- total_written += len;
- arg_idx++;
}
-
if (watcher.fiber == Qnil) {
switchpoint_result = libev_snooze();
if (TEST_EXCEPTION(switchpoint_result)) goto error;
}
RB_GC_GUARD(watcher.fiber);
RB_GC_GUARD(switchpoint_result);
+ free(iov);
return INT2NUM(total_written);
error:
+ free(iov);
return rb_funcall(rb_mKernel, ID_raise, 1, switchpoint_result);
}
+VALUE LibevAgent_write_m(int argc, VALUE *argv, VALUE self) {
+ if (argc < 2)
+ // TODO: raise ArgumentError
+ rb_raise(rb_eRuntimeError, "(wrong number of arguments (expected 2 or more))");
+
+ return (argc == 2) ?
+ LibevAgent_write(self, argv[0], argv[1]) :
+ LibevAgent_writev(self, argv[0], argc - 1, argv + 1);
+}
+
///////////////////////////////////////////////////////////////////////////
VALUE LibevAgent_accept(VALUE self, VALUE sock) {
struct LibevAgent_t *agent;
struct libev_io watcher;
@@ -736,10 +803,10 @@
rb_define_method(cLibevAgent, "poll", LibevAgent_poll, 3);
rb_define_method(cLibevAgent, "break", LibevAgent_break, 0);
rb_define_method(cLibevAgent, "read", LibevAgent_read, 4);
rb_define_method(cLibevAgent, "read_loop", LibevAgent_read_loop, 1);
- rb_define_method(cLibevAgent, "write", LibevAgent_write, -1);
+ rb_define_method(cLibevAgent, "write", LibevAgent_write_m, -1);
rb_define_method(cLibevAgent, "accept", LibevAgent_accept, 1);
rb_define_method(cLibevAgent, "accept_loop", LibevAgent_accept_loop, 1);
// rb_define_method(cLibevAgent, "connect", LibevAgent_accept, 3);
rb_define_method(cLibevAgent, "wait_io", LibevAgent_wait_io, 2);
rb_define_method(cLibevAgent, "sleep", LibevAgent_sleep, 1);