# Copyright, 2017, by Samuel G. D. Williams. # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. require_relative 'buffer' require_relative 'generic' module Async module IO class Stream BLOCK_SIZE = IO::BLOCK_SIZE def self.open(path, mode = "r+", **options) stream = self.new(File.open(path, mode), **options) return stream unless block_given? begin yield stream ensure stream.close end end def initialize(io, block_size: BLOCK_SIZE, maximum_read_size: MAXIMUM_READ_SIZE, sync: true, deferred: nil) @io = io @eof = false @deferred = deferred @pending = 0 # We don't want Ruby to do any IO buffering. @io.sync = sync @block_size = block_size @maximum_read_size = maximum_read_size @read_buffer = Buffer.new @write_buffer = Buffer.new # Used as destination buffer for underlying reads. @input_buffer = Buffer.new end attr_accessor :reactor attr :io attr :block_size # Reads `size` bytes from the stream. If size is not specified, read until end of file. def read(size = nil) return '' if size == 0 if size until @eof or @read_buffer.size >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize # Don't read less than @block_size to avoid lots of small reads: fill_read_buffer(read_size > @block_size ? read_size : @block_size) end else until @eof fill_read_buffer end end return consume_read_buffer(size) end # Read at most `size` bytes from the stream. Will avoid reading from the underlying stream if possible. def read_partial(size = nil) return '' if size == 0 if !@eof and @read_buffer.empty? fill_read_buffer end return consume_read_buffer(size) end def read_exactly(size, exception: EOFError) if buffer = read(size) if buffer.bytesize != size raise exception, "could not read enough data" end return buffer end raise exception, "encountered eof while reading data" end alias readpartial read_partial # Efficiently read data from the stream until encountering pattern. # @param pattern [String] The pattern to match. # @return [String] The contents of the stream up until the pattern, which is consumed but not returned. def read_until(pattern, offset = 0, chomp: true) # We don't want to split on the pattern, so we subtract the size of the pattern. split_offset = pattern.bytesize - 1 until index = @read_buffer.index(pattern, offset) offset = @read_buffer.size - split_offset offset = 0 if offset < 0 return unless fill_read_buffer end @read_buffer.freeze matched = @read_buffer.byteslice(0, index+(chomp ? 0 : pattern.bytesize)) @read_buffer = @read_buffer.byteslice(index+pattern.bytesize, @read_buffer.bytesize) return matched end def peek until yield(@read_buffer) or @eof fill_read_buffer end end # Writes `string` to the buffer. When the buffer is full or #sync is true the # buffer is flushed to the underlying `io`. # @param string the string to write to the buffer. # @return the number of bytes appended to the buffer. def write(string) if @write_buffer.empty? and string.bytesize >= @block_size @io.write(string) else @write_buffer << string if @write_buffer.size >= @block_size @io.write(@write_buffer) @write_buffer.clear end end return string.bytesize end # Writes `string` to the stream and returns self. def <<(string) write(string) return self end # Flushes buffered data to the stream. def flush unless @write_buffer.empty? if @deferred and task = Task.current if @pending.zero? task.reactor << self end @pending += 1 else Async.logger.debug(self) {"Flushing immediate write (#{@write_buffer.bytesize} bytes)..."} @io.write(@write_buffer) @write_buffer.clear @pending = 0 end end end def alive? @pending > 0 end def resume Async.logger.debug(self) {"Flushing #{@pending} writes (#{@write_buffer.bytesize} bytes)..."} @io.write(@write_buffer) @write_buffer.clear @pending = 0 end def gets(separator = $/, **options) read_until(separator, **options) end def puts(*args, separator: $/) args.each do |arg| @write_buffer << arg << separator end flush end def connected? @io.connected? end def closed? @io.closed? end def close_read @io.close_read end def close_write resume unless @write_buffer.empty? ensure @io.close_write end # Best effort to flush any unwritten data, and then close the underling IO. def close return if @io.closed? begin resume unless @write_buffer.empty? rescue # We really can't do anything here unless we want #close to raise exceptions. ensure @io.close end end # Returns true if the stream is at file which means there is no more data to be read. def eof? return false unless @read_buffer.empty? if !@eof fill_read_buffer end return @eof end alias eof eof? def eof! @read_buffer.clear @eof = true raise EOFError end private # Fills the buffer from the underlying stream. def fill_read_buffer(size = @block_size) # We impose a limit because the underlying `read` system call can fail if we request too much data in one go. if size > @maximum_read_size size = @maximum_read_size end if @read_buffer.empty? if @io.read_nonblock(size, @read_buffer, exception: false) return true end else if chunk = @io.read_nonblock(size, @input_buffer, exception: false) @read_buffer << chunk return true end end # else for both cases above: @eof = true return false end # Consumes at most `size` bytes from the buffer. # @param size [Integer|nil] The amount of data to consume. If nil, consume entire buffer. def consume_read_buffer(size = nil) # If we are at eof, and the read buffer is empty, we can't consume anything. return nil if @eof && @read_buffer.empty? result = nil if size.nil? or size >= @read_buffer.size # Consume the entire read buffer: result = @read_buffer @read_buffer = Buffer.new else # This approach uses more memory. # result = @read_buffer.slice!(0, size) # We know that we are not going to reuse the original buffer. # But byteslice will generate a hidden copy. So let's freeze it first: @read_buffer.freeze result = @read_buffer.byteslice(0, size) @read_buffer = @read_buffer.byteslice(size, @read_buffer.bytesize) end return result end end end end