# frozen_string_literal: true # Copyright, 2017, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require 'async/io/socket' require 'async/clock' require_relative 'generic_examples' require_relative 'stream_context' RSpec.describe Async::IO::Stream do # This constant is part of the public interface, but was renamed to `Async::IO::BLOCK_SIZE`. describe "::BLOCK_SIZE" do it "should exist and be reasonable" do expect(Async::IO::Stream::BLOCK_SIZE).to be_between(1024, 1024*128) end end context "socket I/O" do let(:sockets) do @sockets = Async::IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM) end after do @sockets&.each(&:close) end let(:io) {sockets.first} subject {described_class.new(sockets.last)} it_should_behave_like Async::IO describe '#drain_write_buffer' do include_context Async::RSpec::Reactor let(:output) {described_class.new(sockets.last)} subject {described_class.new(sockets.first)} let(:buffer_size) {1024*6} it "can interleave calls to flush" do tasks = 2.times.map do |i| reactor.async do buffer = i.to_s * buffer_size 128.times do output.write(buffer) output.flush end end end reactor.async do tasks.each(&:wait) output.close end Async::Task.current.sleep(1) while buffer = subject.read(buffer_size) expect(buffer).to be == (buffer[0] * buffer_size) end end it "handles write failures" do subject.close task = reactor.async do output.write("Hello World") output.flush(deferred: true) end expect do task.wait end.to raise_error(Errno::EPIPE) write_buffer = output.instance_variable_get(:@write_buffer) drain_buffer = output.instance_variable_get(:@drain_buffer) expect(write_buffer).to be_empty expect(drain_buffer).to be_empty end end describe '#close_read' do subject {described_class.new(sockets.last)} it "can close the reading end of the stream" do expect(subject.io).to receive(:close_read).and_call_original subject.close_read # Ruby <= 2.4 raises an exception even with exception: false # expect(stream.read).to be_nil end it "can close the writing end of the stream" do expect(subject.io).to receive(:close_write).and_call_original subject.write("Oh yes!") subject.close_write expect do subject.write("Oh no!") subject.flush end.to raise_error(IOError, /not opened for writing/) end end describe '#read_exactly' do it "can read several bytes" do io.write("hello\nworld\n") expect(subject.read_exactly(4)).to be == 'hell' end it "can raise exception if io is eof" do io.close expect do subject.read_exactly(4) end.to raise_error(EOFError) end end end context "performance (BLOCK_SIZE: #{Async::IO::BLOCK_SIZE} MAXIMUM_READ_SIZE: #{Async::IO::MAXIMUM_READ_SIZE})" do include_context Async::RSpec::Reactor let!(:stream) {described_class.open("/dev/zero")} after {stream.close} it "can read data quickly" do |example| data = nil duration = Async::Clock.measure do data = stream.read(1024**3) end size = data.bytesize / 1024**2 rate = size / duration example.reporter.message "Read #{size.round(2)}MB of data at #{rate.round(2)}MB/s." expect(rate).to be > 128 end end context "buffered I/O" do include_context Async::IO::Stream include_context Async::RSpec::Memory include_context Async::RSpec::Reactor describe '#read' do it "should read everything" do io.write "Hello World" io.seek(0) expect(subject.io).to receive(:read_nonblock).and_call_original.twice expect(subject.read).to be == "Hello World" expect(subject).to be_eof end it "should read only the amount requested" do io.write "Hello World" io.seek(0) expect(subject.io).to receive(:read_nonblock).and_call_original.once expect(subject.read_partial(4)).to be == "Hell" expect(subject).to_not be_eof expect(subject.read_partial(20)).to be == "o World" expect(subject).to be_eof end context "with large content" do it "allocates expected amount of bytes" do io.write("." * 16*1024) io.seek(0) buffer = nil expect do # The read buffer is already allocated, and it will be resized to fit the incoming data. It will be swapped with an empty buffer. buffer = subject.read(16*1024) end.to limit_allocations.of(String, count: 1, size: 0) expect(buffer.size).to be == 16*1024 end end end describe '#read_until' do it "can read a line" do io.write("hello\nworld\n") io.seek(0) expect(subject.read_until("\n")).to be == 'hello' expect(subject.read_until("\n")).to be == 'world' expect(subject.read_until("\n")).to be_nil end context "with 1-byte block size" do subject! {Async::IO::Stream.new(buffer, block_size: 1)} it "can read a line with a multi-byte pattern" do io.write("hello\r\nworld\r\n") io.seek(0) expect(subject.read_until("\r\n")).to be == 'hello' expect(subject.read_until("\r\n")).to be == 'world' expect(subject.read_until("\r\n")).to be_nil end end context "with large content" do it "allocates expected amount of bytes" do subject expect do subject.read_until("b") end.to limit_allocations.of(String, size: 0, count: 1) end end end describe '#flush' do it "should not call write if write buffer is empty" do expect(subject.io).to_not receive(:write) subject.flush end it "should flush underlying data when it exceeds block size" do expect(subject.io).to receive(:write).and_call_original.once subject.block_size.times do subject.write("!") end end end describe '#read_partial' do before(:each) do string = "Hello World!" io.write(string * (1 + (Async::IO::BLOCK_SIZE / string.bytesize))) io.seek(0) end it "should avoid calling read" do expect(subject.io).to receive(:read_nonblock).and_call_original.once expect(subject.read_partial(12)).to be == "Hello World!" end context "with large content" do it "allocates only the amount required" do expect do subject.read(4*1024) end.to limit_allocations.of(String, count: 2, size: 4*1024+1) end it "allocates exact number of bytes being read" do expect do subject.read_partial(subject.block_size * 2) end.to limit_allocations.of(String, count: 1, size: 0) end it "allocates expected amount of bytes" do buffer = nil expect do buffer = subject.read_partial end.to limit_allocations.of(String, count: 1) expect(buffer.size).to be == subject.block_size end end end describe '#write' do it "should read one line" do expect(subject.io).to receive(:write).and_call_original.once subject.write "Hello World\n" subject.flush io.seek(0) expect(subject.read).to be == "Hello World\n" end end describe '#eof' do it "should terminate subject" do expect do subject.eof! end.to raise_exception(EOFError) expect(subject).to be_eof end end describe '#close' do it 'can be closed even if underlying io is closed' do io.close expect(subject.io).to be_closed # Put some data in the write buffer subject.write "." expect do subject.close end.to_not raise_exception end end end end