#-- # Copyright (C)2007-10 Tony Arcieri # You can redistribute this under the terms of the Ruby license # See file LICENSE for details #++ module Coolio # A buffered I/O class witch fits into the Coolio Watcher framework. # It provides both an observer which reads data as it's received # from the wire and a buffered write watcher which stores data and writes # it out each time the socket becomes writable. # # This class is primarily meant as a base class for other streams # which need non-blocking writing, and is used to implement Coolio's # Socket class and its associated subclasses. class IO extend Meta # Maximum number of bytes to consume at once INPUT_SIZE = 16384 def initialize(io) @_io = io @_write_buffer ||= ::IO::Buffer.new @_read_watcher = Watcher.new(io, self, :r) @_write_watcher = Watcher.new(io, self, :w) end # # Watcher methods, delegated to @_read_watcher # # Attach to the event loop def attach(loop); @_read_watcher.attach loop; schedule_write if !@_write_buffer.empty?; self; end # Detach from the event loop def detach; @_read_watcher.detach; self; end # TODO should these detect write buffers, as well? # Enable the watcher def enable; @_read_watcher.enable; self; end # Disable the watcher def disable; @_read_watcher.disable; self; end # Is the watcher attached? def attached?; @_read_watcher.attached?; end # Is the watcher enabled? def enabled?; @_read_watcher.enabled?; end # Obtain the event loop associated with this object def evloop; @_read_watcher.evloop; end # # Callbacks for asynchronous events # # Called whenever the IO object receives data def on_read(data); end event_callback :on_read # Called whenever a write completes and the output buffer is empty def on_write_complete; end event_callback :on_write_complete # Called whenever the IO object hits EOF def on_close; end event_callback :on_close # # Write interface # # Write data in a buffered, non-blocking manner def write(data) @_write_buffer << data schedule_write data.size end # Number of bytes are currently in the output buffer def output_buffer_size @_write_buffer.size end # Close the IO stream def close detach if attached? detach_write_watcher @_io.close unless @_io.closed? on_close nil end # Is the IO object closed? def closed? @_io.nil? or @_io.closed? end ######### protected ######### # Read from the input buffer and dispatch to on_read def on_readable begin on_read @_io.read_nonblock(INPUT_SIZE) rescue Errno::EAGAIN, Errno::EINTR return # SystemCallError catches Errno::ECONNRESET amongst others. rescue SystemCallError, EOFError, IOError, SocketError close end end # Write the contents of the output buffer def on_writable begin @_write_buffer.write_to(@_io) rescue Errno::EINTR return # SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others. rescue SystemCallError, IOError, SocketError return close end if @_write_buffer.empty? disable_write_watcher on_write_complete end end # Schedule a write to be performed when the IO object becomes writable def schedule_write return unless @_io # this would mean 'we are still pre DNS here' return unless attached? # this would mean 'currently unattached' -- ie still pre DNS, or just plain not attached, which is ok begin enable_write_watcher rescue IOError end end def enable_write_watcher if @_write_watcher.attached? @_write_watcher.enable unless @_write_watcher.enabled? else @_write_watcher.attach(evloop) end end def disable_write_watcher @_write_watcher.disable if @_write_watcher and @_write_watcher.enabled? end def detach_write_watcher @_write_watcher.detach if @_write_watcher and @_write_watcher.attached? end # Internal class implementing watchers used by Coolio::IO class Watcher < IOWatcher def initialize(ruby_io, coolio_io, flags) @coolio_io = coolio_io super(ruby_io, flags) end # Configure IOWatcher event callbacks to call the method passed to #initialize def on_readable; @coolio_io.__send__(:on_readable); end def on_writable; @coolio_io.__send__(:on_writable); end end end end