# $Id$ # # Author:: Francis Cianfrocca (gmail: blackhedd) # Homepage:: http://rubyeventmachine.com # Date:: 8 Apr 2006 # # See EventMachine and EventMachine::Connection for documentation and # usage examples. # #---------------------------------------------------------------------------- # # Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved. # Gmail: blackhedd # # This program is free software; you can redistribute it and/or modify # it under the terms of either: 1) the GNU General Public License # as published by the Free Software Foundation; either version 2 of the # License, or (at your option) any later version; or 2) Ruby's License. # # See the file COPYING for complete licensing information. # #------------------------------------------------------------------- # # # TODO List: # TCP-connects currently assume non-blocking connect is available- need to # degrade automatically on versions of Ruby prior to June 2006. # require 'singleton' require 'forwardable' require 'socket' require 'fcntl' require 'set' module EventMachine class << self # This is mostly useful for automated tests. # Return a distinctive symbol so the caller knows whether he's dealing # with an extension or with a pure-Ruby library. def library_type :pure_ruby end # #initialize_event_machine def initialize_event_machine Reactor.instance.initialize_for_run end # #add_oneshot_timer #-- # Changed 04Oct06: intervals from the caller are now in milliseconds, but our native-ruby # processor still wants them in seconds. def add_oneshot_timer interval Reactor.instance.install_oneshot_timer(interval / 1000) end # run_machine def run_machine Reactor.instance.run end # release_machine. Probably a no-op. def release_machine end # #stop def stop Reactor.instance.stop end # #connect_server. Return a connection descriptor to the caller. # TODO, what do we return here if we can't connect? def connect_server host, port, bind_host EvmaTCPClient.connect(host, port, bind_host).uuid end # #send_data def send_data target, data, datalength selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target" selectable.send_data data end # #close_connection # The extension version does NOT raise any kind of an error if an attempt is made # to close a non-existent connection. Not sure whether we should. For now, we'll # raise an error here in that case. def close_connection target, after_writing selectable = Reactor.instance.get_selectable( target ) or raise "unknown close_connection target" selectable.schedule_close after_writing end # #start_tcp_server def start_tcp_server host, port (s = EvmaTCPServer.start_server host, port) or raise "no acceptor" s.uuid end # #stop_tcp_server def stop_tcp_server sig s = Reactor.instance.get_selectable(sig) s.schedule_close end # #start_unix_server def start_unix_server chain (s = EvmaUNIXServer.start_server chain) or raise "no acceptor" s.uuid end # #connect_unix_server def connect_unix_server chain EvmaUNIXClient.connect(chain).uuid end # #signal_loopbreak def signal_loopbreak Reactor.instance.signal_loopbreak end # #get_peername def get_peername sig selectable = Reactor.instance.get_selectable( sig ) or raise "unknown get_peername target" selectable.get_peername end # #open_udp_socket def open_udp_socket host, port EvmaUDPSocket.create(host, port).uuid end # #send_datagram. This is currently only for UDP! # We need to make it work with unix-domain sockets as well. def send_datagram target, data, datalength, host, port selectable = Reactor.instance.get_selectable( target ) or raise "unknown send_data target" selectable.send_datagram data, Socket::pack_sockaddr_in(port, host) end # #set_timer_quantum in milliseconds. The underlying Reactor function wants a (possibly # fractional) number of seconds. def set_timer_quantum interval Reactor.instance.set_timer_quantum(( 1.0 * interval) / 1000.0) end # #epoll is a harmless no-op in the pure-Ruby implementation. This is intended to ensure # that user code behaves properly across different EM implementations. def epoll end # #ssl? is not implemented for pure-Ruby implementation def ssl? false end # #set_rlimit_nofile is a no-op in the pure-Ruby implementation. We simply return Ruby's built-in # per-process file-descriptor limit. def set_rlimit_nofile n 1024 end # #set_max_timer_count is a harmless no-op in pure Ruby, which doesn't have a built-in limit # on the number of available timers. def set_max_timer_count n end # #send_file_data def send_file_data sig, filename sz = File.size(filename) raise "file too large" if sz > 32*1024 data = begin File.read filename rescue "" end send_data sig, data, data.length end # #get_outbound_data_size # def get_outbound_data_size sig r = Reactor.instance.get_selectable( sig ) or raise "unknown get_outbound_data_size target" r.get_outbound_data_size end # #read_keyboard # def read_keyboard EvmaKeyboard.open.uuid end # #set_comm_inactivity_timeout # def set_comm_inactivity_timeout sig, tm r = Reactor.instance.get_selectable( sig ) or raise "unknown set_comm_inactivity_timeout target" r.set_inactivity_timeout tm end end end #----------------------------------------------------------------- module EventMachine class Error < Exception; end end #----------------------------------------------------------------- module EventMachine class Connection def get_outbound_data_size EventMachine::get_outbound_data_size @signature end end end #----------------------------------------------------------------- module EventMachine # Factored out so we can substitute other implementations # here if desired, such as the one in ActiveRBAC. module UuidGenerator def self.generate if @ix and @ix >= 10000 @ix = nil @seed = nil end @seed ||= `uuidgen`.chomp.gsub(/-/,"") @ix ||= 0 "#{@seed}#{@ix += 1}" end end end #----------------------------------------------------------------- module EventMachine TimerFired = 100 ConnectionData = 101 ConnectionUnbound = 102 ConnectionAccepted = 103 ConnectionCompleted = 104 LoopbreakSignalled = 105 end #----------------------------------------------------------------- module EventMachine class Reactor include Singleton HeartbeatInterval = 2 attr_reader :current_loop_time def initialize initialize_for_run end #-- # Replaced original implementation 05Dec07, was way too slow because of the sort. def install_oneshot_timer interval uuid = UuidGenerator::generate #@timers << [Time.now + interval, uuid] #@timers.sort! {|a,b| a.first <=> b.first} @timers.add([Time.now + interval, uuid]) uuid end # Called before run, this is a good place to clear out arrays # with cruft that may be left over from a previous run. def initialize_for_run @running = false @stop_scheduled = false @selectables ||= {}; @selectables.clear @timers = SortedSet.new # [] set_timer_quantum(0.1) @current_loop_time = Time.now @next_heartbeat = @current_loop_time + HeartbeatInterval end def add_selectable io @selectables[io.uuid] = io end def get_selectable uuid @selectables[uuid] end def run raise Error.new( "already running" ) if @running @running = true begin open_loopbreaker loop { @current_loop_time = Time.now break if @stop_scheduled run_timers break if @stop_scheduled crank_selectables break if @stop_scheduled run_heartbeats } ensure close_loopbreaker @selectables.each {|k, io| io.close} @selectables.clear @running = false end end def run_timers @timers.each {|t| if t.first <= @current_loop_time @timers.delete t EventMachine::event_callback "", TimerFired, t.last else break end } #while @timers.length > 0 and @timers.first.first <= now # t = @timers.shift # EventMachine::event_callback "", TimerFired, t.last #end end def run_heartbeats if @next_heartbeat <= @current_loop_time @next_heartbeat = @current_loop_time + HeartbeatInterval @selectables.each {|k,io| io.heartbeat} end end def crank_selectables #$stderr.write 'R' readers = @selectables.values.select {|io| io.select_for_reading?} writers = @selectables.values.select {|io| io.select_for_writing?} s = select( readers, writers, nil, @timer_quantum) s and s[1] and s[1].each {|w| w.eventable_write } s and s[0] and s[0].each {|r| r.eventable_read } @selectables.delete_if {|k,io| if io.close_scheduled? io.close true end } end # #stop def stop raise Error.new( "not running") unless @running @stop_scheduled = true end def open_loopbreaker # Can't use an IO.pipe because they can't be set nonselectable in Windows. # Pick a random localhost UDP port. #@loopbreak_writer.close if @loopbreak_writer #rd,@loopbreak_writer = IO.pipe @loopbreak_reader = UDPSocket.new @loopbreak_writer = UDPSocket.new bound = false 100.times { @loopbreak_port = rand(10000) + 40000 begin @loopbreak_reader.bind "localhost", @loopbreak_port bound = true break rescue end } raise "Unable to bind Loopbreaker" unless bound LoopbreakReader.new(@loopbreak_reader) end def close_loopbreaker @loopbreak_writer.close @loopbreak_writer = nil end def signal_loopbreak #@loopbreak_writer.write '+' if @loopbreak_writer @loopbreak_writer.send('+',0,"localhost",@loopbreak_port) if @loopbreak_writer end def set_timer_quantum interval_in_seconds @timer_quantum = interval_in_seconds end end end #-------------------------------------------------------------- class IO extend Forwardable def_delegator :@my_selectable, :close_scheduled? def_delegator :@my_selectable, :select_for_reading? def_delegator :@my_selectable, :select_for_writing? def_delegator :@my_selectable, :eventable_read def_delegator :@my_selectable, :eventable_write def_delegator :@my_selectable, :uuid def_delegator :@my_selectable, :send_data def_delegator :@my_selectable, :schedule_close def_delegator :@my_selectable, :get_peername def_delegator :@my_selectable, :send_datagram def_delegator :@my_selectable, :get_outbound_data_size def_delegator :@my_selectable, :set_inactivity_timeout def_delegator :@my_selectable, :heartbeat end #-------------------------------------------------------------- module EventMachine class Selectable attr_reader :io, :uuid def initialize io @uuid = UuidGenerator.generate @io = io @last_activity = Reactor.instance.current_loop_time if defined?(Fcntl::F_GETFL) m = @io.fcntl(Fcntl::F_GETFL, 0) @io.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | m) else # Windows doesn't define F_GETFL. # It's not very reliable about setting descriptors nonblocking either. begin s = Socket.for_fd(@io.fileno) s.fcntl( Fcntl::F_SETFL, Fcntl::O_NONBLOCK ) rescue Errno::EINVAL, Errno::EBADF STDERR.puts "Serious error: unable to set descriptor non-blocking" end end # TODO, should set CLOEXEC on Unix? @close_scheduled = false @close_requested = false se = self; @io.instance_eval { @my_selectable = se } Reactor.instance.add_selectable @io end def close_scheduled? @close_scheduled end def select_for_reading? false end def select_for_writing? false end def get_peername nil end def set_inactivity_timeout tm @inactivity_timeout = tm end def heartbeat end end end #-------------------------------------------------------------- module EventMachine class StreamObject < Selectable def initialize io super io @outbound_q = [] end # If we have to close, or a close-after-writing has been requested, # then don't read any more data. def select_for_reading? true unless (@close_scheduled || @close_requested) end # If we have to close, don't select for writing. # Otherwise, see if the protocol is ready to close. # If not, see if he has data to send. # If a close-after-writing has been requested and the outbound queue # is empty, convert the status to close_scheduled. def select_for_writing? unless @close_scheduled if @outbound_q.empty? @close_scheduled = true if @close_requested false else true end end end # Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. # If we have it, then we can read multiple times safely to improve # performance. # The last-activity clock ASSUMES that we only come here when we # have selected readable. # TODO, coalesce multiple reads into a single event. # TODO, do the function check somewhere else and cache it. def eventable_read @last_activity = Reactor.instance.current_loop_time begin if io.respond_to?(:read_nonblock) 10.times { data = io.read_nonblock(4096) EventMachine::event_callback uuid, ConnectionData, data } else data = io.sysread(4096) EventMachine::event_callback uuid, ConnectionData, data end rescue Errno::EAGAIN, Errno::EWOULDBLOCK # no-op rescue Errno::ECONNRESET, Errno::ECONNREFUSED, EOFError @close_scheduled = true EventMachine::event_callback uuid, ConnectionUnbound, nil end end # Provisional implementation. Will be re-implemented in subclasses. # TODO: Complete this implementation. As it stands, this only writes # a single packet per cycle. Highly inefficient, but required unless # we're running on a Ruby with proper nonblocking I/O (Ruby 1.8.4 # built from sources from May 25, 2006 or newer). # We need to improve the loop so it writes multiple times, however # not more than a certain number of bytes per cycle, otherwise # one busy connection could hog output buffers and slow down other # connections. Also we should coalesce small writes. # URGENT TODO: Coalesce small writes. They are a performance killer. # The last-activity recorder ASSUMES we'll only come here if we've # selected writable. def eventable_write # coalesce the outbound array here, perhaps @last_activity = Reactor.instance.current_loop_time while data = @outbound_q.shift do begin data = data.to_s w = if io.respond_to?(:write_nonblock) io.write_nonblock data else io.syswrite data end if w < data.length @outbound_q.unshift data[w..-1] break end rescue Errno::EAGAIN @outbound_q.unshift data rescue EOFError, Errno::ECONNRESET, Errno::ECONNREFUSED @close_scheduled = true @outbound_q.clear end end end # #send_data def send_data data # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last? unless @close_scheduled or @close_requested or !data or data.length <= 0 @outbound_q << data.to_s end end # #schedule_close # The application wants to close the connection. def schedule_close after_writing if after_writing @close_requested = true else @close_scheduled = true end end # #get_peername # This is defined in the normal way on connected stream objects. # Return an object that is suitable for passing to Socket#unpack_sockaddr_in or variants. # We could also use a convenience method that did the unpacking automatically. def get_peername io.getpeername end # #get_outbound_data_size def get_outbound_data_size @outbound_q.inject(0) {|memo,obj| memo += (obj || "").length} end def heartbeat if @inactivity_timeout and (@last_activity + @inactivity_timeout) < Reactor.instance.current_loop_time schedule_close true end end end end #-------------------------------------------------------------- module EventMachine class EvmaTCPClient < StreamObject def self.connect host, port, bind_host = nil sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) sd.bind(Socket.pack_sockaddr_in(0, bind_host)) if bind_host begin # TODO, this assumes a current Ruby snapshot. # We need to degrade to a nonblocking connect otherwise. sd.connect_nonblock( Socket.pack_sockaddr_in( port, host )) rescue Errno::EINPROGRESS end EvmaTCPClient.new sd end def initialize io super @pending = true end def select_for_writing? @pending ? true : super end def select_for_reading? @pending ? false : super end def eventable_write if @pending @pending = false if 0 == io.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR).unpack("i").first EventMachine::event_callback uuid, ConnectionCompleted, "" end else super end end end end #-------------------------------------------------------------- module EventMachine class EvmaKeyboard < StreamObject def self.open EvmaKeyboard.new STDIN end def initialize io super end def select_for_writing? false end def select_for_reading? true end end end #-------------------------------------------------------------- module EventMachine class EvmaUNIXClient < StreamObject def self.connect chain sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 ) begin # TODO, this assumes a current Ruby snapshot. # We need to degrade to a nonblocking connect otherwise. sd.connect_nonblock( Socket.pack_sockaddr_un( chain )) rescue Errno::EINPROGRESS end EvmaUNIXClient.new sd end def initialize io super @pending = true end def select_for_writing? @pending ? true : super end def select_for_reading? @pending ? false : super end def eventable_write if @pending @pending = false if 0 == io.getsockopt(Socket::SOL_SOCKET, Socket::SO_ERROR).unpack("i").first EventMachine::event_callback uuid, ConnectionCompleted, "" end else super end end end end #-------------------------------------------------------------- module EventMachine class EvmaTCPServer < Selectable # TODO, refactor and unify with EvmaUNIXServer. class << self # Versions of ruby 1.8.4 later than May 26 2006 will work properly # with an object of type TCPServer. Prior versions won't so we # play it safe and just build a socket. # def start_server host, port sd = Socket.new( Socket::AF_INET, Socket::SOCK_STREAM, 0 ) sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true ) sd.bind( Socket.pack_sockaddr_in( port, host )) sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough. EvmaTCPServer.new sd end end def initialize io super io end def select_for_reading? true end #-- # accept_nonblock returns an array consisting of the accepted # socket and a sockaddr_in which names the peer. # Don't accept more than 10 at a time. def eventable_read begin 10.times { descriptor,peername = io.accept_nonblock sd = StreamObject.new descriptor EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid } rescue Errno::EWOULDBLOCK, Errno::EAGAIN end end #-- # def schedule_close @close_scheduled = true end end end #-------------------------------------------------------------- module EventMachine class EvmaUNIXServer < Selectable # TODO, refactor and unify with EvmaTCPServer. class << self # Versions of ruby 1.8.4 later than May 26 2006 will work properly # with an object of type TCPServer. Prior versions won't so we # play it safe and just build a socket. # def start_server chain sd = Socket.new( Socket::AF_LOCAL, Socket::SOCK_STREAM, 0 ) sd.setsockopt( Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true ) sd.bind( Socket.pack_sockaddr_un( chain )) sd.listen( 50 ) # 5 is what you see in all the books. Ain't enough. EvmaUNIXServer.new sd end end def initialize io super io end def select_for_reading? true end #-- # accept_nonblock returns an array consisting of the accepted # socket and a sockaddr_in which names the peer. # Don't accept more than 10 at a time. def eventable_read begin 10.times { descriptor,peername = io.accept_nonblock sd = StreamObject.new descriptor EventMachine::event_callback uuid, ConnectionAccepted, sd.uuid } rescue Errno::EWOULDBLOCK, Errno::EAGAIN end end #-- # def schedule_close @close_scheduled = true end end end #-------------------------------------------------------------- module EventMachine class LoopbreakReader < Selectable def select_for_reading? true end def eventable_read io.sysread(128) EventMachine::event_callback "", LoopbreakSignalled, "" end end end #-------------------------------------------------------------- module EventMachine class DatagramObject < Selectable def initialize io super io @outbound_q = [] end # #send_datagram def send_datagram data, target # TODO, coalesce here perhaps by being smarter about appending to @outbound_q.last? unless @close_scheduled or @close_requested @outbound_q << [data.to_s, target] end end # #select_for_writing? def select_for_writing? unless @close_scheduled if @outbound_q.empty? @close_scheduled = true if @close_requested false else true end end end # #select_for_reading? def select_for_reading? true end # #get_outbound_data_size def get_outbound_data_size @outbound_q.inject(0) {|memo,obj| memo += (obj || "").length} end end end #-------------------------------------------------------------- module EventMachine class EvmaUDPSocket < DatagramObject class << self def create host, port sd = Socket.new( Socket::AF_INET, Socket::SOCK_DGRAM, 0 ) sd.bind Socket::pack_sockaddr_in( port, host ) EvmaUDPSocket.new sd end end # #eventable_write # This really belongs in DatagramObject, but there is some UDP-specific stuff. def eventable_write 40.times { break if @outbound_q.empty? begin data,target = @outbound_q.first # This damn better be nonblocking. io.send data.to_s, 0, target @outbound_q.shift rescue Errno::EAGAIN # It's not been observed in testing that we ever get here. # True to the definition, packets will be accepted and quietly dropped # if the system is under pressure. break rescue EOFError, Errno::ECONNRESET @close_scheduled = true @outbound_q.clear end } end # Proper nonblocking I/O was added to Ruby 1.8.4 in May 2006. # If we have it, then we can read multiple times safely to improve # performance. def eventable_read begin if io.respond_to?(:recvfrom_nonblock) 40.times { data,@return_address = io.recvfrom_nonblock(16384) EventMachine::event_callback uuid, ConnectionData, data @return_address = nil } else raise "unimplemented datagram-read operation on this Ruby" end rescue Errno::EAGAIN # no-op rescue Errno::ECONNRESET, EOFError @close_scheduled = true EventMachine::event_callback uuid, ConnectionUnbound, nil end end def send_data data send_datagram data, @return_address end end end #--------------------------------------------------------------