# frozen_string_literal: true # Copyright, 2017, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require_relative 'buffer' require_relative 'generic' require 'async/semaphore' module Async module IO class Stream BLOCK_SIZE = IO::BLOCK_SIZE def self.open(path, mode = "r+", **options) stream = self.new(File.open(path, mode), **options) return stream unless block_given? begin yield stream ensure stream.close end end def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: false) @io = io @eof = false @deferred = deferred @pending = 0 @writing = Async::Semaphore.new(1) # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = Buffer.new @write_buffer = Buffer.new @drain_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end attr :io attr :block_size # Reads `size` bytes from the stream. If size is not specified, read until end of file. def read(size = nil) return '' if size == 0 if size until @eof or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end # Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible. def read_partial(size = nil) return '' if size == 0 if !@eof and @read_buffer.empty? fill_read_buffer end return consume_read_buffer(size) end def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end alias readpartial read_partial # Efficiently read data from the stream until encountering pattern. # @param pattern [String] The pattern to match. # @return [String] The contents of the stream up until the pattern, which is consumed but not returned. def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.bytesize - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end def peek until yield(@read_buffer) or @eof fill_read_buffer end end def gets(separator = $/, **options) read_until(separator, **options) end # Flushes buffered data to the stream. def flush(deferred: @deferred) if deferred and task = Task.current? @pending += 1 if @pending == 1 task.yield drain_write_buffer unless @write_buffer.empty? end else drain_write_buffer unless @write_buffer.empty? end end # Writes `string` to the buffer. When the buffer is full or #sync is true the # buffer is flushed to the underlying `io`. # @param string the string to write to the buffer. # @return the number of bytes appended to the buffer. def write(string) @write_buffer << string if @write_buffer.bytesize >= @block_size flush end return string.bytesize end # Writes `string` to the stream and returns self. def <<(string) write(string) return self end def puts(*arguments, separator: $/) arguments.each do |argument| @write_buffer << argument << separator end flush end def connected? @io.connected? end def closed? @io.closed? end def close_read @io.close_read end def close_write drain_write_buffer unless @write_buffer.empty? ensure @io.close_write end # Best effort to flush any unwritten data, and then close the underling IO. def close return if @io.closed? begin drain_write_buffer unless @write_buffer.empty? rescue # We really can't do anything here unless we want #close to raise exceptions. ensure @io.close end end # Returns true if the stream is at file which means there is no more data to be read. def eof? if !@read_buffer.empty? return false elsif @eof return true else return @io.eof? end end alias eof eof? def eof! @read_buffer.clear @eof = true raise EOFError end private def drain_write_buffer @writing.acquire do Async.logger.debug(self) do |buffer| if @pending > 0 buffer.puts "Draining #{@pending} writes (#{@write_buffer.bytesize} bytes)..." else buffer.puts "Draining immediate write (#{@write_buffer.bytesize} bytes)..." end # buffer.puts "@write_buffer = #{@write_buffer.inspect}" # buffer.puts "@drain_buffer = #{@drain_buffer.inspect}" end # Flip the write buffer and drain buffer: @write_buffer, @drain_buffer = @drain_buffer, @write_buffer # The write buffer no longer contains pending writes: @pending = 0 begin @io.write(@drain_buffer) ensure # If the write operation fails, we still need to clear this buffer, and the data is essentially lost. @drain_buffer.clear end end end # Fills the buffer from the underlying stream. def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. if size > @maximum_read_size size = @maximum_read_size end # This effectively ties the input and output stream together. if @pending > 0 drain_write_buffer end if @read_buffer.empty? if @io.read_nonblock(size, @read_buffer, exception: false) # Async.logger.debug(self, name: "read") {@read_buffer.inspect} return true end else if chunk = @io.read_nonblock(size, @input_buffer, exception: false) @read_buffer << chunk # Async.logger.debug(self, name: "read") {@read_buffer.inspect} return true end end # else for both cases above: @eof = true return false end # Consumes at most `size` bytes from the buffer. # @param size [Integer|nil] The amount of data to consume. If nil, consume entire buffer. def consume_read_buffer(size = nil) # If we are at eof, and the read buffer is empty, we can't consume anything. return nil if @eof && @read_buffer.empty? result = nil if size.nil? or size >= @read_buffer.bytesize # Consume the entire read buffer: result = @read_buffer @read_buffer = Buffer.new else # This approach uses more memory. # result = @read_buffer.slice!(0, size) # We know that we are not going to reuse the original buffer. # But byteslice will generate a hidden copy. So let's freeze it first: @read_buffer.freeze result = @read_buffer.byteslice(0, size) @read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize) end return result end end end end