#include "ruby.h" #ifdef HAVE_RUBY_IO_H # include "ruby/io.h" #else # include "rubyio.h" #endif #include #include #include #include #include #include #include static VALUE sym_EAGAIN; #define WAITALL 0x4000000 /* taken from haproxy */ #define MAX_AT_ONCE (1 << 30) #ifndef F_LINUX_SPECIFIC_BASE # define F_LINUX_SPECIFIC_BASE 1024 #endif #ifndef F_GETPIPE_SZ # define F_SETPIPE_SZ (F_LINUX_SPECIFIC_BASE + 7) # define F_GETPIPE_SZ (F_LINUX_SPECIFIC_BASE + 8) #endif #if ! HAVE_RB_IO_T # define rb_io_t OpenFile #endif #ifdef GetReadFile # define FPTR_TO_FD(fptr) (fileno(GetReadFile(fptr))) #else # if !HAVE_RB_IO_T || (RUBY_VERSION_MAJOR == 1 && RUBY_VERSION_MINOR == 8) # define FPTR_TO_FD(fptr) fileno(fptr->f) # else # define FPTR_TO_FD(fptr) fptr->fd # endif #endif #ifndef SSIZET2NUM # define SSIZET2NUM(x) LONG2NUM(x) #endif #ifndef NUM2SSIZET # define NUM2SSIZET(x) NUM2LONG(x) #endif #ifndef SIZET2NUM # define SIZET2NUM(x) ULONG2NUM(x) #endif #ifndef NUM2SIZET # define NUM2SIZET(x) NUM2ULONG(x) #endif static int my_fileno(VALUE io) { rb_io_t *fptr; for (;;) { switch (TYPE(io)) { case T_FIXNUM: return FIX2INT(io); case T_FILE: { GetOpenFile(io, fptr); return FPTR_TO_FD(fptr); } default: io = rb_convert_type(io, T_FILE, "IO", "to_io"); /* retry */ } } } static int check_fileno(VALUE io) { int saved_errno = errno; int fd = my_fileno(io); errno = saved_errno; return fd; } #if defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL) && defined(HAVE_RUBY_THREAD_H) /* Ruby 2.0+ */ # include # define WITHOUT_GVL(fn,a,ubf,b) \ rb_thread_call_without_gvl((fn),(a),(ubf),(b)) #elif defined(HAVE_RB_THREAD_BLOCKING_REGION) typedef VALUE (*my_blocking_fn_t)(void*); # define WITHOUT_GVL(fn,a,ubf,b) \ rb_thread_blocking_region((my_blocking_fn_t)(fn),(a),(ubf),(b)) #else /* Ruby 1.8 */ /* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */ # include # define RUBY_UBF_IO ((rb_unblock_function_t *)-1) typedef void rb_unblock_function_t(void *); typedef void * rb_blocking_function_t(void *); static void * WITHOUT_GVL(rb_blocking_function_t *func, void *data1, rb_unblock_function_t *ubf, void *data2) { void *rv; assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation"); TRAP_BEG; rv = func(data1); TRAP_END; return rv; } #endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */ #ifndef RSTRING_PTR # define RSTRING_PTR(s) (RSTRING(s)->ptr) #endif #ifndef RSTRING_LEN # define RSTRING_LEN(s) (RSTRING(s)->len) #endif #ifndef RARRAY_LEN # define RARRAY_LEN(s) (RARRAY(s)->len) #endif #define io_run(fn,data) WITHOUT_GVL((fn),(data),RUBY_UBF_IO,0) struct splice_args { int fd_in; int fd_out; off_t *off_in; off_t *off_out; size_t len; unsigned flags; }; static void * nogvl_splice(void *ptr) { struct splice_args *a = ptr; if (a->len > MAX_AT_ONCE) a->len = MAX_AT_ONCE; return (void *)splice(a->fd_in, a->off_in, a->fd_out, a->off_out, a->len, a->flags); } static ssize_t do_splice(int argc, VALUE *argv, unsigned dflags) { off_t i = 0, o = 0; VALUE io_in, off_in, io_out, off_out, len, flags; struct splice_args a; ssize_t bytes; ssize_t total = 0; unsigned waitall; rb_scan_args(argc, argv, "51", &io_in, &off_in, &io_out, &off_out, &len, &flags); a.off_in = NIL_P(off_in) ? NULL : (i = NUM2OFFT(off_in), &i); a.off_out = NIL_P(off_out) ? NULL : (o = NUM2OFFT(off_out), &o); a.len = NUM2SIZET(len); a.flags = NIL_P(flags) ? dflags : NUM2UINT(flags) | dflags; waitall = a.flags & WAITALL; if (waitall) a.flags ^= WAITALL; for (;;) { a.fd_in = check_fileno(io_in); a.fd_out = check_fileno(io_out); bytes = (ssize_t)io_run(nogvl_splice, &a); if (bytes < 0) { if (errno == EINTR) continue; if (waitall && errno == EAGAIN) { rb_io_wait_readable(check_fileno(io_in)); errno = EAGAIN; rb_io_wait_writable(check_fileno(io_out)); continue; } if (total > 0) return total; return bytes; } else if (bytes == 0) { break; } else if (waitall) { total += bytes; if ((a.len -= bytes) == 0) return total; i += bytes; o += bytes; } else { return bytes; } } return total; } /* * call-seq: * IO.splice(io_in, off_in, io_out, off_out, len) => integer * IO.splice(io_in, off_in, io_out, off_out, len, flags) => integer * * Splice +len+ bytes from/to a pipe. Either +io_in+ or +io_out+ * MUST be a pipe. +io_in+ and +io_out+ may BOTH be pipes as of * Linux 2.6.31 or later. * * +off_in+ and +off_out+ if non-nil may be used to * specify an offset for the non-pipe file descriptor. * * +flags+ defaults to zero if unspecified. * +flags+ may be a bitmask of the following flags: * * * IO::Splice::F_MOVE * * IO::Splice::F_NONBLOCK * * IO::Splice::F_MORE * * Returns the number of bytes spliced. * Raises EOFError when +io_in+ has reached end of file. * Raises Errno::EAGAIN if the IO::Splice::F_NONBLOCK flag is set * and the pipe has no data to read from or space to write to. May * also raise Errno::EAGAIN if the non-pipe descriptor has no data * to read from or space to write to. * * As splice never exposes buffers to userspace, it will not take * into account userspace buffering done by Ruby or stdio. It is * also not subject to encoding/decoding filters under Ruby 1.9. * * Consider using IO.trysplice if +io_out+ is a pipe or if you are using * non-blocking I/O on both descriptors as it avoids the cost of raising * common Errno::EAGAIN exceptions. * * See manpage for full documentation: * http://kernel.org/doc/man-pages/online/pages/man2/splice.2.html */ static VALUE my_splice(int argc, VALUE *argv, VALUE self) { ssize_t n = do_splice(argc, argv, 0); if (n == 0) rb_eof_error(); if (n < 0) rb_sys_fail("splice"); return SSIZET2NUM(n); } /* * call-seq: * IO.trysplice(io_in, off_in, io_out, off_out, len) => integer * IO.trysplice(io_in, off_in, io_out, off_out, len, flags) => integer * * Exactly like IO.splice, except +:EAGAIN+ is returned when either * the read or write end would block instead of raising Errno::EAGAIN. * * IO::Splice::F_NONBLOCK is always passed for the pipe descriptor, * but this can still block if the non-pipe descriptor is blocking. * * See IO.splice documentation for more details. * * This method is recommended whenever +io_out+ is a pipe. */ static VALUE trysplice(int argc, VALUE *argv, VALUE self) { ssize_t n = do_splice(argc, argv, SPLICE_F_NONBLOCK); if (n == 0) return Qnil; if (n < 0) { if (errno == EAGAIN) return sym_EAGAIN; rb_sys_fail("splice"); } return SSIZET2NUM(n); } struct tee_args { int fd_in; int fd_out; size_t len; unsigned flags; }; /* runs without GVL */ static void * nogvl_tee(void *ptr) { struct tee_args *a = ptr; if (a->len > MAX_AT_ONCE) a->len = MAX_AT_ONCE; return (void *)tee(a->fd_in, a->fd_out, a->len, a->flags); } static ssize_t do_tee(int argc, VALUE *argv, unsigned dflags) { VALUE io_in, io_out, len, flags; struct tee_args a; ssize_t bytes; ssize_t total = 0; unsigned waitall; rb_scan_args(argc, argv, "31", &io_in, &io_out, &len, &flags); a.len = (size_t)NUM2SIZET(len); a.flags = NIL_P(flags) ? dflags : NUM2UINT(flags) | dflags; waitall = a.flags & WAITALL; if (waitall) a.flags ^= WAITALL; for (;;) { a.fd_in = check_fileno(io_in); a.fd_out = check_fileno(io_out); bytes = (ssize_t)io_run(nogvl_tee, &a); if (bytes < 0) { if (errno == EINTR) continue; if (waitall && errno == EAGAIN) { rb_io_wait_readable(check_fileno(io_in)); errno = EAGAIN; rb_io_wait_writable(check_fileno(io_out)); continue; } if (total > 0) return total; return bytes; } else if (bytes == 0) { break; } else if (waitall) { total += bytes; if ((a.len -= bytes) == 0) return total; } else { return bytes; } } return total; } /* * call-seq: * IO.tee(io_in, io_out, len) => integer * IO.tee(io_in, io_out, len, flags) => integer * * Copies up to +len+ bytes of data from +io_in+ to +io_out+. +io_in+ * and +io_out+ must both refer to pipe descriptors. +io_in+ and +io_out+ * may not be endpoints of the same pipe. * * +flags+ may be zero (the default) or a combination of: * * IO::Splice::F_NONBLOCK * * Other IO::Splice flags are currently unimplemented or have no effect. * * Returns the number of bytes duplicated if successful. * Raises EOFError when +io_in+ is closed and emptied. * Raises Errno::EAGAIN when +io_in+ is empty and/or +io_out+ is full * and +flags+ contains IO::Splice::F_NONBLOCK * * Consider using IO.trytee if you are using IO::Splice::F_NONBLOCK * as it avoids the cost of raising common Errno::EAGAIN exceptions. * * See manpage for full documentation: * http://kernel.org/doc/man-pages/online/pages/man2/tee.2.html */ static VALUE my_tee(int argc, VALUE *argv, VALUE self) { ssize_t n = do_tee(argc, argv, 0); if (n == 0) rb_eof_error(); if (n < 0) rb_sys_fail("tee"); return SSIZET2NUM(n); } /* * call-seq: * IO.trytee(io_in, io_out, len) => integer * IO.trytee(io_in, io_out, len, flags) => integer * * Exactly like IO.tee, except +:EAGAIN+ is returned when either * the read or write end would block instead of raising Errno::EAGAIN. * * IO::Splice::F_NONBLOCK is always passed for the pipe descriptor, * but this can still block if the non-pipe descriptor is blocking. * * See IO.tee documentation for more details. */ static VALUE trytee(int argc, VALUE *argv, VALUE self) { ssize_t n = do_tee(argc, argv, SPLICE_F_NONBLOCK); if (n == 0) return Qnil; if (n < 0) { if (errno == EAGAIN) return sym_EAGAIN; rb_sys_fail("tee"); } return SSIZET2NUM(n); } struct vmsplice_args { int fd; unsigned flags; struct iovec *iov; unsigned long nr_segs; }; static void * nogvl_vmsplice(void *ptr) { struct vmsplice_args *a = ptr; return (void *)vmsplice(a->fd, a->iov, a->nr_segs, a->flags); } /* this can't be a function since we use alloca() */ #define ARY2IOVEC(iov,iovcnt,expect,ary) \ do { \ struct iovec *tmp; \ unsigned long i; \ iovcnt = (unsigned long)RARRAY_LEN(ary); \ if (iovcnt > IOV_MAX) \ rb_raise(rb_eArgError, "array is larger than IOV_MAX"); \ iov = tmp = alloca(sizeof(struct iovec) * iovcnt); \ expect = 0; \ for (i = 0; i < iovcnt; tmp++, i++) { \ VALUE cur = rb_ary_entry(ary, (long)i); \ Check_Type(cur, T_STRING); \ tmp->iov_base = RSTRING_PTR(cur); \ tmp->iov_len = RSTRING_LEN(cur); \ expect += tmp->iov_len; \ } \ } while (0) static void advance_vmsplice_args(struct vmsplice_args *a, long n) { struct iovec *new_iov = a->iov; unsigned long i; /* skip over iovecs we've already written completely */ for (i = 0; i < a->nr_segs; i++, new_iov++) { if (n == 0) break; /* * partially written iov, * modify and retry with current iovec in * front */ if (new_iov->iov_len > (size_t)n) { VALUE base = (VALUE)new_iov->iov_base; new_iov->iov_len -= n; new_iov->iov_base = (void *)(base + n); break; } n -= new_iov->iov_len; } /* setup to retry without the already-written iovecs */ a->nr_segs -= i; a->iov = new_iov; } /* * call-seq: * IO.vmsplice(io, string_array) => integer * IO.vmsplice(io, string_array, flags) => integer * IO.vmsplice(io, string) => integer * IO.vmsplice(io, string, flags) => integer * * Transfers an array of strings into the pipe descriptor given by io. * +io+ must be the writable end of a pipe. * * This may allow the kernel to avoid data copies in some cases. * but is (probably) of limited usefulness in Ruby. If you have * use cases or ideas for making this more useful for Ruby users, * please tell us at ruby-io-splice@bogomips.org! * * Also consider the "sendfile" RubyGem or IO.copy_stream in Ruby 1.9 * if you want to do zero-copy file transfers to pipes or sockets. As * of Linux 2.6.33, sendfile(2) can copy to any output descriptor, * not just sockets. * * See manpage for full documentation: * http://kernel.org/doc/man-pages/online/pages/man2/vmsplice.2.html */ static VALUE my_vmsplice(int argc, VALUE * argv, VALUE self) { ssize_t rv = 0; ssize_t left; struct vmsplice_args a; struct iovec iov; ssize_t n; VALUE io, data, flags; rb_scan_args(argc, argv, "21", &io, &data, &flags); switch (TYPE(data)) { case T_STRING: iov.iov_base = RSTRING_PTR(data); iov.iov_len = (size_t)(left = (ssize_t)RSTRING_LEN(data)); a.iov = &iov; a.nr_segs = 1; break; case T_ARRAY: ARY2IOVEC(a.iov, a.nr_segs, left, data); break; default: rb_raise(rb_eTypeError, "wrong argument type %s " "(expected a String or Array of strings)", rb_obj_classname(data)); } a.flags = NIL_P(flags) ? 0 : NUM2UINT(flags); for (;;) { a.fd = check_fileno(io); n = (ssize_t)io_run(nogvl_vmsplice, &a); if (n < 0) { if (errno == EAGAIN) { if (a.flags & SPLICE_F_NONBLOCK) rb_sys_fail("vmsplice"); if (rb_io_wait_writable(check_fileno(io))) continue; /* fall through on error */ } /* * unlikely to hit this case, return the * already written bytes, we'll let the next * write (or close) fail instead */ if (rv > 0) break; if (errno == EINTR) continue; rb_sys_fail("vmsplice"); } rv += n; left -= n; if (left == 0) break; advance_vmsplice_args(&a, n); } return SSIZET2NUM(rv); } /* * call-seq: * reader, writer = IO.pipe * reader.pipe_size => integer * * Returns the pipe capacity of the underlying pipe in bytes. The * default capacity is 65536 bytes since Linux 2.6.11, and 4096 bytes * in previous kernels. * * Since the pipe is a circular buffer in the same kernel, the size * of the reader is exactly the same as the size of the writer. * * This method is only exposed on Linux 2.6.35 or later. */ static VALUE pipe_size(VALUE self) { int size = fcntl(my_fileno(self), F_GETPIPE_SZ); if (size < 0) rb_sys_fail("fcntl(F_GETPIPE_SZ)"); return INT2NUM(size); } /* * call-seq: * reader, writer = IO.pipe * reader.pipe_size = integer * * Sets and returns the pipe capacity of the underlying pipe in bytes. * * This MUST be a power-of-two, or Errno::EINVAL will be raised. * Linux will silently increase this to be equal to the page size * (4096 bytes on most architectures) if the specified value is * less than the size of a page. * * For users without CAP_SYS_RESOURCE, this raises Errno::EPERM when * attempting to specify a value greater than the value in * /proc/sys/fs/pipe-max-size. * * Since the pipe is a circular buffer in the same kernel, the size * of the reader is exactly the same as the size of the writer. * * Raises Errno::EBUSY if the assigned value is less than * the currently filled portion of the pipe. * * This method is only exposed on Linux 2.6.35 or later. */ static VALUE set_pipe_size(VALUE self, VALUE size) { int fd = my_fileno(self); int bytes = NUM2INT(size); int rv = fcntl(fd, F_SETPIPE_SZ, bytes); if (rv < 0) { if (errno == ENOMEM) { rb_gc(); rv = fcntl(fd, F_SETPIPE_SZ, bytes); } if (rv < 0) rb_sys_fail("fcntl(F_SETPIPE_SZ)"); } return size; } static int can_mod_pipe_size(void) { /* * pipe2 appeared in Linux 2.6.27, F_*PIPE_SZ appeared in 2.6.35, * thus not having pipe2 automatically disqualifies us from having * F_*PIPE_SZ support */ #ifdef HAVE_PIPE2 int fds[2]; int rc = pipe2(fds, O_CLOEXEC); if (rc == 0) { rc = fcntl(fds[0], F_GETPIPE_SZ); rc = rc < 0 ? 0 : 1; (void)close(fds[0]); (void)close(fds[1]); } else { /* * weird error, but don't raise during init, this could be * ENOSYS, even.. */ rc = 0; } errno = 0; return rc; #else /* ! HAVE_PIPE2 */ return 0; #endif /* ! HAVE_PIPE2 */ } #define NODOC_CONST(klass,name,value) \ rb_define_const((klass),(name),(value)) void Init_io_splice_ext(void) { VALUE mSplice = rb_define_module_under(rb_cIO, "Splice"); rb_define_singleton_method(rb_cIO, "splice", my_splice, -1); rb_define_singleton_method(rb_cIO, "trysplice", trysplice, -1); rb_define_singleton_method(rb_cIO, "tee", my_tee, -1); rb_define_singleton_method(rb_cIO, "trytee", trytee, -1); rb_define_singleton_method(rb_cIO, "vmsplice", my_vmsplice, -1); /* * Attempt to move pages instead of copying. This is only a hint * and support for it was removed in Linux 2.6.21. It will be * re-added for FUSE filesystems only in Linux 2.6.35. */ rb_define_const(mSplice, "F_MOVE", UINT2NUM(SPLICE_F_MOVE)); assert(WAITALL != SPLICE_F_MOVE && "WAITALL == F_MOVE"); /* * Do not block on pipe I/O. This flag only affects the pipe(s) * being spliced from/to and has no effect on the non-pipe * descriptor (which requires non-blocking operation to be set * explicitly). * * The non-blocking flag (O_NONBLOCK) on the pipe descriptors * themselves are ignored by this family of functions, and * using this flag is the only way to get non-blocking operation * out of them. * * It is highly recommended this flag be set (or IO.trysplice used) * whenever splicing from a socket into a pipe unless there is * another (native) thread or process doing a blocking read on that * pipe. Otherwise it is possible to block a single-threaded process * if the socket buffers are larger than the pipe buffers. */ rb_define_const(mSplice, "F_NONBLOCK", UINT2NUM(SPLICE_F_NONBLOCK)); assert(WAITALL != SPLICE_F_NONBLOCK && "WAITALL == F_NONBLOCK"); /* * Indicate that there may be more data coming into the outbound * descriptor. This can allow the kernel to avoid sending partial * frames from sockets. Currently only used with splice. */ rb_define_const(mSplice, "F_MORE", UINT2NUM(SPLICE_F_MORE)); assert(WAITALL != SPLICE_F_MORE && "WAITALL == F_MORE"); /* * Only usable by vmsplice. This flag probably not useful in the * context of Ruby applications which cannot control alignment. */ rb_define_const(mSplice, "F_GIFT", UINT2NUM(SPLICE_F_GIFT)); assert(WAITALL != SPLICE_F_GIFT && "WAITALL == F_GIFT"); /* * Retry until the requested transfer is complete, this will * cause IO.splice/IO.tee to never return less than the requested * transfer size unless an error occored. * * IO.vmsplice always defaults to this behavior. */ NODOC_CONST(mSplice, "WAITALL", UINT2NUM(WAITALL)); /* * The maximum size of an atomic write to a pipe * POSIX requires this to be at least 512 bytes. * Under Linux, this is 4096 bytes. */ rb_define_const(mSplice, "PIPE_BUF", UINT2NUM(PIPE_BUF)); /* * The maximum size we're allowed to splice at once. Larger * sizes will be broken up and retried if the WAITALL flag or * IO::Splice.copy_stream is used. */ rb_define_const(mSplice, "MAX_AT_ONCE", SIZET2NUM(MAX_AT_ONCE)); if (can_mod_pipe_size()) { rb_define_method(rb_cIO, "pipe_size", pipe_size, 0); rb_define_method(rb_cIO, "pipe_size=", set_pipe_size, 1); /* * fcntl() command constant used to return the size of a pipe. * This constant is only defined when running Linux 2.6.35 * or later. For convenience, use IO#pipe_size instead. */ rb_define_const(mSplice, "F_GETPIPE_SZ", UINT2NUM(F_GETPIPE_SZ)); /* * fcntl() command constant used to set the size of a pipe. * This constant is only defined when running Linux 2.6.35 * or later. For convenience, use IO#pipe_size= instead. */ rb_define_const(mSplice, "F_SETPIPE_SZ", UINT2NUM(F_SETPIPE_SZ)); } sym_EAGAIN = ID2SYM(rb_intern("EAGAIN")); }