# Copyright (C) 2013, Eric Wong and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) require_relative 'helper' require 'timeout' class TestWbuf < Testcase parallelize_me! def test_wbuf buf = "*" * (16384 * 2) nr = 1000 [ true, false ].each do |persist| wbuf = Yahns::Wbuf.new([], persist) a, b = UNIXSocket.pair assert_nil wbuf.wbuf_write(a, "HIHI") assert_equal "HIHI", b.read(4) nr.times { wbuf.wbuf_write(a, buf) } assert_equal :wait_writable, wbuf.wbuf_flush(a) done = IO.pipe thr = Thread.new do rv = [] until rv[-1] == persist IO.select(nil, [a]) tmp = wbuf.wbuf_flush(a) rv << tmp end done[1].syswrite '.' rv end wait = true begin if wait r = IO.select([b,done[0]], nil, nil, 5) end b.read_nonblock((rand * 1024) + 666, buf) wait = (r[0] & done).empty? rescue Errno::EAGAIN break end while true assert_equal thr, thr.join(5) rv = thr.value assert_equal persist, rv.pop assert(rv.all? { |x| x == :wait_writable }) a.close b.close done.each { |io| io.close } end end def test_wbuf_blocked a, b = UNIXSocket.pair buf = "." * 4096 4.times do begin a.write_nonblock(buf) rescue Errno::EAGAIN break end while true end wbuf = Yahns::Wbuf.new([], true) assert_equal :wait_writable, wbuf.wbuf_write(a, buf) assert_equal :wait_writable, wbuf.wbuf_flush(a) # drain the buffer Timeout.timeout(10) { b.read(b.nread) until b.nread == 0 } # b.nread will increase after this assert_nil wbuf.wbuf_write(a, "HI") nr = b.nread assert_operator nr, :>, 0 assert_equal b, IO.select([b], nil, nil, 5)[0][0] b.read(nr - 2) if nr > 2 assert_equal b, IO.select([b], nil, nil, 5)[0][0] assert_equal "HI", b.read(2) begin wbuf.wbuf_flush(a) assert false rescue => e end assert_match(%r{BUG: EOF on tmpio}, e.message) ensure a.close b.close end def test_wbuf_flush_close pipe = IO.pipe persist = true wbuf = Yahns::Wbuf.new(pipe[0], persist) refute wbuf.respond_to?(:close) # we don't want this for HttpResponse body sp = UNIXSocket.pair rv = nil buf = ("*" * 16384) << "\n" thr = Thread.new do 1000.times { pipe[1].write(buf) } pipe[1].close end pipe[0].each { |chunk| rv = wbuf.wbuf_write(sp[1], chunk) } assert_equal thr, thr.join(5) assert_equal :wait_writable, rv done = IO.pipe thr = Thread.new do rv = [] until rv[-1] == persist IO.select(nil, [sp[1]]) rv << wbuf.wbuf_flush(sp[1]) end done[1].syswrite '.' rv end wait = true begin if wait r = IO.select([sp[0],done[0]], nil, nil, 5) end sp[0].read_nonblock(16384, buf) wait = (r[0] & done).empty? rescue Errno::EAGAIN break end while true assert_equal thr, thr.join(5) rv = thr.value assert_equal true, rv.pop assert rv.all? { |x| x == :wait_writable } assert pipe[0].closed? sp.each(&:close) done.each(&:close) end end