lib/rev/buffered_io.rb in rev-0.1.0 vs lib/rev/buffered_io.rb in rev-0.1.1

- old
+ new

@@ -6,10 +6,13 @@ require File.dirname(__FILE__) + '/../rev' module Rev class BufferedIO < IOWatcher + # Maximum number of bytes to consume at once + INPUT_SIZE = 16384 + def initialize(io) # Output buffer @write_buffer = '' # Coerce the argument into an IO object if possible @@ -39,74 +42,88 @@ # Write data in a buffered, non-blocking manner def write(data) # Attempt a zero copy write if @write_buffer.empty? - written = @io.write_nonblock data + written = write_nonblock data # If we lucked out and wrote out the whole buffer, return if written == data.size on_write_complete return data.size end - # Otherwise append the remaining data to the buffer - @write_buffer << data[written..data.size] - else - @write_buffer << data + # Otherwise slice what we wrote out and begin buffered writing + data.slice!(0, written) if written end - + + @write_buffer << data schedule_write data.size end # Number of bytes are currently in the output buffer def output_buffer_size @write_buffer.size end + # Close the BufferedIO stream + def close + detach if attached? + @writer.detach if @writer and @writer.attached? + @io.close unless @io.closed? + + on_close + end + + ######### + protected + ######### + # Attempt to write the contents of the output buffer def write_output_buffer return if @write_buffer.empty? - written = @io.write_nonblock @write_buffer - @write_buffer.slice!(written, @write_buffer.size) + written = write_nonblock @write_buffer + @write_buffer.slice!(0, written) if written return unless @write_buffer.empty? @writer.disable if @writer and @writer.enabled? on_write_complete end - - # Close the BufferedIO stream - def close - detach if attached? - @writer.detach if @writer and @writer.attached? - @io.close - - on_close + + # Wrapper for handling reset connections and EAGAIN + def write_nonblock(data) + begin + @io.write_nonblock(data) + rescue Errno::ECONNRESET, Errno::EPIPE + close + rescue Errno::EAGAIN + end end - ######### - protected - ######### - # Inherited callback from IOWatcher def on_readable begin - on_read @io.read_nonblock(4096) - rescue EOFError + on_read @io.read_nonblock(INPUT_SIZE) + rescue Errno::ECONNRESET, EOFError close end end def schedule_write return if @writer and @writer.enabled? if @writer @writer.enable - else - @writer = Writer.new(@io, self) + else + begin + @writer = Writer.new(@io, self) + rescue IOError + return + end + @writer.attach(evloop) end end class Writer < IOWatcher @@ -114,10 +131,10 @@ @buffered_io = buffered_io super(io, :w) end def on_writable - @buffered_io.write_output_buffer + @buffered_io.__send__(:write_output_buffer) end end end end