lib/fluq/input/socket.rb in fluq-0.7.5 vs lib/fluq/input/socket.rb in fluq-0.8.0

- old
+ new

@@ -1,50 +1,80 @@ class FluQ::Input::Socket < FluQ::Input::Base + MAXLEN = 16 * 1024 # @attr_reader [URI] url the URL attr_reader :url # Constructor. # @option options [String] :bind the URL to bind to # @raises [ArgumentError] when no bind URL provided # @raises [URI::InvalidURIError] if invalid URL is given # @example Launch a server # - # server = FluQ::Server.new(reactor, bind: "tcp://localhost:7654") + # server = FluQ::Server.new(runner, bind: "tcp://localhost:7654") # def initialize(*) super + end - raise ArgumentError, 'No URL to bind to provided, make sure you pass :bind option' unless config[:bind] - @url = FluQ::URL.parse(config[:bind], protocols) + # @return [String] short name + def name + @url ? @url.scheme : super end # @return [String] descriptive name - def name - @name ||= "#{super} (#{@url})" + def description + "socket (#{@url})" end # Start the server def run - args = [self.class::Connection, self] - case @url.scheme + super + + @server = case @url.scheme when 'tcp' - EventMachine.start_server @url.host, @url.port, *args + TCPServer.new(@url.host, @url.port) when 'udp' - EventMachine.open_datagram_socket @url.host, @url.port, *args + UDPSocket.new.tap {|s| s.bind(@url.host, @url.port) } when 'unix' - EventMachine.start_server @url.path, *args + UNIXServer.open(@url.path) end + + case @url.scheme + when 'udp' + loop { process @server.recvfrom(MAXLEN)[0] } + else + loop { async.handle_connection @server.accept } + end end + # @return [Boolean] true when listening + def listening? + !!@server + end + protected - # @return [Array] supported protocols - def protocols - ["tcp", "udp", "unix"] + # @abstract callback for configuration initialization + def configure + raise ArgumentError, 'No URL to bind to provided, make sure you pass :bind option' unless config[:bind] + @url = FluQ::URL.parse config[:bind], %w|tcp udp unix| end -end + # Handle a single connection + def handle_connection(socket) + loop do + process socket.readpartial(MAXLEN) + end + rescue EOFError + ensure + socket.close + end -%w'connection'.each do |name| - require "fluq/input/socket/#{name}" + def before_terminate + return unless @server + + @server.close + File.delete @url.path if @url.scheme == "unix" + end + end