lib/rev/buffered_io.rb in rev-0.1.0 vs lib/rev/buffered_io.rb in rev-0.1.1
- old
+ new
@@ -6,10 +6,13 @@
require File.dirname(__FILE__) + '/../rev'
module Rev
class BufferedIO < IOWatcher
+ # Maximum number of bytes to consume at once
+ INPUT_SIZE = 16384
+
def initialize(io)
# Output buffer
@write_buffer = ''
# Coerce the argument into an IO object if possible
@@ -39,74 +42,88 @@
# Write data in a buffered, non-blocking manner
def write(data)
# Attempt a zero copy write
if @write_buffer.empty?
- written = @io.write_nonblock data
+ written = write_nonblock data
# If we lucked out and wrote out the whole buffer, return
if written == data.size
on_write_complete
return data.size
end
- # Otherwise append the remaining data to the buffer
- @write_buffer << data[written..data.size]
- else
- @write_buffer << data
+ # Otherwise slice what we wrote out and begin buffered writing
+ data.slice!(0, written) if written
end
-
+
+ @write_buffer << data
schedule_write
data.size
end
# Number of bytes are currently in the output buffer
def output_buffer_size
@write_buffer.size
end
+ # Close the BufferedIO stream
+ def close
+ detach if attached?
+ @writer.detach if @writer and @writer.attached?
+ @io.close unless @io.closed?
+
+ on_close
+ end
+
+ #########
+ protected
+ #########
+
# Attempt to write the contents of the output buffer
def write_output_buffer
return if @write_buffer.empty?
- written = @io.write_nonblock @write_buffer
- @write_buffer.slice!(written, @write_buffer.size)
+ written = write_nonblock @write_buffer
+ @write_buffer.slice!(0, written) if written
return unless @write_buffer.empty?
@writer.disable if @writer and @writer.enabled?
on_write_complete
end
-
- # Close the BufferedIO stream
- def close
- detach if attached?
- @writer.detach if @writer and @writer.attached?
- @io.close
-
- on_close
+
+ # Wrapper for handling reset connections and EAGAIN
+ def write_nonblock(data)
+ begin
+ @io.write_nonblock(data)
+ rescue Errno::ECONNRESET, Errno::EPIPE
+ close
+ rescue Errno::EAGAIN
+ end
end
- #########
- protected
- #########
-
# Inherited callback from IOWatcher
def on_readable
begin
- on_read @io.read_nonblock(4096)
- rescue EOFError
+ on_read @io.read_nonblock(INPUT_SIZE)
+ rescue Errno::ECONNRESET, EOFError
close
end
end
def schedule_write
return if @writer and @writer.enabled?
if @writer
@writer.enable
- else
- @writer = Writer.new(@io, self)
+ else
+ begin
+ @writer = Writer.new(@io, self)
+ rescue IOError
+ return
+ end
+
@writer.attach(evloop)
end
end
class Writer < IOWatcher
@@ -114,10 +131,10 @@
@buffered_io = buffered_io
super(io, :w)
end
def on_writable
- @buffered_io.write_output_buffer
+ @buffered_io.__send__(:write_output_buffer)
end
end
end
end