# -*- encoding: binary -*- require 'test/unit' require 'tempfile' require 'socket' require 'io/nonblock' require 'timeout' $-w = true require 'io/splice' # unused_port provides an unused port on +addr+ usable for TCP that is # guaranteed to be unused across all unicorn builds on that system. It # prevents race conditions by using a lock file other unicorn builds # will see. This is required if you perform several builds in parallel # with a continuous integration system or run tests in parallel via # gmake. This is NOT guaranteed to be race-free if you run other # processes that bind to random ports for testing (but the window # for a race condition is very small). def unused_port(addr = '127.0.0.1') retries = 100 base = 5000 port = sock = nil begin begin port = base + rand(32768 - base) while port == 8080 port = base + rand(32768 - base) end sock = Socket.new(Socket::AF_INET, Socket::SOCK_STREAM, 0) sock.bind(Socket.pack_sockaddr_in(port, addr)) sock.listen(5) rescue Errno::EADDRINUSE, Errno::EACCES sock.close rescue nil retry if (retries -= 1) >= 0 end # since we'll end up closing the random port we just got, there's a race # condition could allow the random port we just chose to reselect itself # when running tests in parallel with gmake. Create a lock file while # we have the port here to ensure that does not happen . lock_path = "#{Dir::tmpdir}/unicorn_test.#{addr}:#{port}.lock" lock = File.open(lock_path, File::WRONLY|File::CREAT|File::EXCL, 0600) at_exit { File.unlink(lock_path) rescue nil } rescue Errno::EEXIST sock.close rescue nil retry end sock.close rescue nil port end class Test_IO_Splice < Test::Unit::TestCase def test_splice str = 'abcde' size = 5 rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') assert_nothing_raised { tmp.syswrite(str) tmp.sysseek(0) } nr = IO.splice(tmp.fileno, nil, wr.fileno, nil, size, 0) assert_equal size, nr assert_equal str, rd.sysread(size) end def test_splice_io str = 'abcde' size = 5 rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') assert_nothing_raised { tmp.syswrite(str) tmp.sysseek(0) } nr = IO.splice(tmp, nil, wr, nil, size, 0) assert_equal size, nr assert_equal str, rd.sysread(size) end def test_splice_io_ish str = 'abcde' size = 5 rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') io_ish = [ tmp ] def io_ish.to_io first.to_io end assert_nothing_raised { tmp.syswrite(str) tmp.sysseek(0) } nr = IO.splice(io_ish, nil, wr, nil, size, 0) assert_equal size, nr assert_equal str, rd.sysread(size) end def test_splice_in_offset str = 'abcde' off = 3 len = 2 rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') assert_nothing_raised { tmp.syswrite(str) tmp.sysseek(0) } nr = IO.splice(tmp.fileno, off, wr.fileno, nil, len, 0) assert_equal len, nr assert_equal 'de', rd.sysread(len) end def test_splice_out_offset str = 'abcde' rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') assert_nothing_raised { wr.syswrite(str) } nr = IO.splice(rd.fileno, nil, tmp.fileno, 3, str.size, 0) assert_equal 5, nr assert_nothing_raised { tmp.sysseek(0) } assert_equal "\0\0\0abcde", tmp.sysread(9) end def test_splice_nonblock rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') assert_raises(Errno::EAGAIN) { IO.splice(rd.fileno, nil, tmp.fileno, 0, 5, IO::Splice::F_NONBLOCK) } end def test_splice_eof rd, wr = IO.pipe tmp = Tempfile.new('ruby_io_splice') wr.syswrite 'abc' wr.close nr = IO.splice(rd.fileno, nil, tmp.fileno, 0, 5, IO::Splice::F_NONBLOCK) assert_equal 3, nr assert_raises(EOFError) { IO.splice(rd.fileno, nil, tmp.fileno, 0, 5, IO::Splice::F_NONBLOCK) } end def test_splice_nonblock_socket port = unused_port server = TCPServer.new('127.0.0.1', port) rp, wp = IO.pipe rs = TCPSocket.new('127.0.0.1', port) rs.nonblock = true assert_raises(Errno::EAGAIN) { IO.splice(rs, nil, wp, nil, 1024, 0) } rs.close server.close end def test_tee str = 'abcde' size = 5 rda, wra = IO.pipe rdb, wrb = IO.pipe assert_nothing_raised { wra.syswrite(str) } nr = IO.tee(rda.fileno, wrb.fileno, size, 0) assert_equal 5, nr assert_equal str, rdb.sysread(5) assert_equal str, rda.sysread(5) end def test_tee_eof rda, wra = IO.pipe rdb, wrb = IO.pipe wra.close assert_raises(EOFError) { IO.tee(rda.fileno, wrb.fileno, 4096, 0) } end def test_tee_nonblock rda, wra = IO.pipe rdb, wrb = IO.pipe assert_raises(Errno::EAGAIN) { IO.tee(rda.fileno, wrb.fileno, 4096, IO::Splice::F_NONBLOCK) } end def test_tee_io str = 'abcde' size = 5 rda, wra = IO.pipe rdb, wrb = IO.pipe assert_nothing_raised { wra.syswrite(str) } nr = IO.tee(rda, wrb, size, 0) assert_equal 5, nr assert_equal str, rdb.sysread(5) assert_equal str, rda.sysread(5) end def test_vmsplice_array data = %w(hello world how are you today) r, w = IO.pipe n = IO.vmsplice(w.fileno, data, 0) assert_equal data.join('').size, n assert_equal data.join(''), r.readpartial(16384) end def test_vmsplice_string r, w = IO.pipe assert_equal 5, IO.vmsplice(w, 'hello', 0) assert_equal 'hello', r.read(5) end def test_vmsplice_array_io data = %w(hello world how are you today) r, w = IO.pipe n = IO.vmsplice(w, data, 0) assert_equal data.join('').size, n assert_equal data.join(''), r.readpartial(16384) end def test_vmsplice_nonblock data = %w(hello world how are you today) r, w = IO.pipe w.syswrite('.' * IO::Splice::PIPE_CAPA) assert_raises(Errno::EAGAIN) { IO.vmsplice(w.fileno, data, IO::Splice::F_NONBLOCK) } end def test_vmsplice_in_full empty = "" # bs * count should be > PIPE_BUF [ [ 512, 512 ], [ 131073, 3 ], [ 4098, 64 ] ].each do |(bs,count)| rd, wr = IO.pipe buf = File.open('/dev/urandom', 'rb') { |fp| fp.sysread(bs) } vec = (1..count).map { buf } pid = fork do wr.close tmp = [] begin sleep 0.005 tmp << rd.readpartial(8192) rescue EOFError break end while true ok = (vec.join(empty) == tmp.join(empty)) exit! ok end assert_nothing_raised { rd.close } assert_equal(bs * count, IO.vmsplice(wr.fileno, vec, 0)) assert_nothing_raised { wr.close } _, status = Process.waitpid2(pid) assert status.success? end end def test_vmsplice_nil data = %w(hello world how are you today) assert_raises(TypeError) { IO.vmsplice(nil, data, 0) } end def test_constants assert IO::Splice::PIPE_BUF > 0 %w(move nonblock more gift).each { |x| assert Integer === IO::Splice.const_get("F_#{x.upcase}") } assert IO::Splice::PIPE_CAPA >= IO::Splice::PIPE_BUF end def test_splice_copy_stream_file_to_file_small a, b = Tempfile.new('a'), Tempfile.new('b') a.syswrite 'hello world' a.sysseek(0) IO::Splice.copy_stream(a, b) b.rewind assert_equal 'hello world', b.read end def test_splice_copy_stream_file_to_file_big buf = ('ab' * IO::Splice::PIPE_CAPA) + 'hi' a, b = Tempfile.new('a'), Tempfile.new('b') a.syswrite buf a.sysseek(0) IO::Splice.copy_stream(a, b) b.rewind assert_equal buf, b.read end def test_splice_copy_stream_file_to_file_big_partial nr = IO::Splice::PIPE_CAPA buf = ('ab' * nr) + 'hi' a, b = Tempfile.new('a'), Tempfile.new('b') a.syswrite buf a.sysseek(0) assert_equal nr, IO::Splice.copy_stream(a, b, nr) b.rewind assert_equal('ab' * (nr/2), b.read) end def test_splice_copy_stream_file_to_file_len a, b = Tempfile.new('a'), Tempfile.new('b') a.syswrite 'hello world' a.sysseek(0) IO::Splice.copy_stream(a, b, 5) b.rewind assert_equal 'hello', b.read end def test_splice_copy_stream_pipe_to_file_len a = Tempfile.new('a') r, w = IO.pipe w.syswrite 'hello world' IO::Splice.copy_stream(r, a, 5) a.rewind assert_equal 'hello', a.read end def test_splice_copy_stream_paths a = Tempfile.new('a') b = Tempfile.new('a') a.syswrite('hello world') IO::Splice.copy_stream(a.path, b.path, 5) assert_equal 'hello', b.read end def test_splice_copy_stream_src_offset a = Tempfile.new('a') b = Tempfile.new('a') a.syswrite('hello world') IO::Splice.copy_stream(a.path, b.path, 5, 6) assert_equal 'world', b.read end def test_copy_stream_nonblock_src port = unused_port server = TCPServer.new('127.0.0.1', port) rp, wp = IO.pipe rs = TCPSocket.new('127.0.0.1', port) rs.nonblock = true nr = 0 assert_raises(Timeout::Error) do timeout(0.05) { nr += IO::Splice.copy_stream(rs, wp, 5) } end assert_equal 0, nr rs.close server.close end def test_copy_stream_nonblock_dst port = unused_port server = TCPServer.new('127.0.0.1', port) rp, wp = IO.pipe rs = TCPSocket.new('127.0.0.1', port) rs.nonblock = true client = server.accept buf = ' ' * IO::Splice::PIPE_CAPA nr = 0 assert_raises(Timeout::Error) do loop do begin wp.write_nonblock(buf) rescue Errno::EAGAIN end timeout(0.05) do nr += IO::Splice.copy_stream(rp, rs, IO::Splice::PIPE_CAPA) end end end assert_equal nr, client.read(nr).size rs.close server.close end def test_copy_stream_eof r, w = IO.pipe w.syswrite 'hello world' w.close a = Tempfile.new('a') assert_equal 11, IO::Splice.copy_stream(r, a) a.rewind assert_equal 'hello world', a.read end def test_pipe_size r, w = IO.pipe assert Integer, r.pipe_size assert(r.pipe_size >= 512) assert_nothing_raised { w.pipe_size = 8192 } assert 8192, r.pipe_size w.write('*' * 4097) assert_raises(Errno::EBUSY) { r.pipe_size = 4096 } pipe_max_size = File.read("/proc/sys/fs/pipe-max-size").to_i assert_nothing_raised { r.pipe_size = pipe_max_size } assert_raises(Errno::EPERM) { r.pipe_size = pipe_max_size * 2 } end if IO.method_defined?(:pipe_size) end