lib/fluq/input/socket.rb in fluq-0.7.5 vs lib/fluq/input/socket.rb in fluq-0.8.0
- old
+ new
@@ -1,50 +1,80 @@
class FluQ::Input::Socket < FluQ::Input::Base
+ MAXLEN = 16 * 1024
# @attr_reader [URI] url the URL
attr_reader :url
# Constructor.
# @option options [String] :bind the URL to bind to
# @raises [ArgumentError] when no bind URL provided
# @raises [URI::InvalidURIError] if invalid URL is given
# @example Launch a server
#
- # server = FluQ::Server.new(reactor, bind: "tcp://localhost:7654")
+ # server = FluQ::Server.new(runner, bind: "tcp://localhost:7654")
#
def initialize(*)
super
+ end
- raise ArgumentError, 'No URL to bind to provided, make sure you pass :bind option' unless config[:bind]
- @url = FluQ::URL.parse(config[:bind], protocols)
+ # @return [String] short name
+ def name
+ @url ? @url.scheme : super
end
# @return [String] descriptive name
- def name
- @name ||= "#{super} (#{@url})"
+ def description
+ "socket (#{@url})"
end
# Start the server
def run
- args = [self.class::Connection, self]
- case @url.scheme
+ super
+
+ @server = case @url.scheme
when 'tcp'
- EventMachine.start_server @url.host, @url.port, *args
+ TCPServer.new(@url.host, @url.port)
when 'udp'
- EventMachine.open_datagram_socket @url.host, @url.port, *args
+ UDPSocket.new.tap {|s| s.bind(@url.host, @url.port) }
when 'unix'
- EventMachine.start_server @url.path, *args
+ UNIXServer.open(@url.path)
end
+
+ case @url.scheme
+ when 'udp'
+ loop { process @server.recvfrom(MAXLEN)[0] }
+ else
+ loop { async.handle_connection @server.accept }
+ end
end
+ # @return [Boolean] true when listening
+ def listening?
+ !!@server
+ end
+
protected
- # @return [Array] supported protocols
- def protocols
- ["tcp", "udp", "unix"]
+ # @abstract callback for configuration initialization
+ def configure
+ raise ArgumentError, 'No URL to bind to provided, make sure you pass :bind option' unless config[:bind]
+ @url = FluQ::URL.parse config[:bind], %w|tcp udp unix|
end
-end
+ # Handle a single connection
+ def handle_connection(socket)
+ loop do
+ process socket.readpartial(MAXLEN)
+ end
+ rescue EOFError
+ ensure
+ socket.close
+ end
-%w'connection'.each do |name|
- require "fluq/input/socket/#{name}"
+ def before_terminate
+ return unless @server
+
+ @server.close
+ File.delete @url.path if @url.scheme == "unix"
+ end
+
end