Sha256: 757a7d0afdd9f5c16b9030c9e79d93de1b8116d645e11fcd9df55bc28fa2455c
Contents?: true
Size: 1.69 KB
Versions: 2
Compression:
Stored size: 1.69 KB
Contents
class FluQ::Input::Socket < FluQ::Input::Base MAXLEN = 16 * 1024 # @attr_reader [URI] url the URL attr_reader :url # Constructor. # @option options [String] :bind the URL to bind to # @raises [ArgumentError] when no bind URL provided # @raises [URI::InvalidURIError] if invalid URL is given # @example Launch a server # # server = FluQ::Server.new(runner, bind: "tcp://localhost:7654") # def initialize(*) super end # @return [String] short name def name @url ? @url.scheme : super end # @return [String] descriptive name def description "socket (#{@url})" end # Start the server def run super @server = case @url.scheme when 'tcp' TCPServer.new(@url.host, @url.port) when 'udp' UDPSocket.new.tap {|s| s.bind(@url.host, @url.port) } when 'unix' UNIXServer.open(@url.path) end case @url.scheme when 'udp' loop { process @server.recvfrom(MAXLEN)[0] } else loop { async.handle_connection @server.accept } end end # @return [Boolean] true when listening def listening? !!@server end protected # @abstract callback for configuration initialization def configure raise ArgumentError, 'No URL to bind to provided, make sure you pass :bind option' unless config[:bind] @url = FluQ::URL.parse config[:bind], %w|tcp udp unix| end # Handle a single connection def handle_connection(socket) loop do process socket.readpartial(MAXLEN) end rescue EOFError ensure socket.close end def before_terminate return unless @server @server.close File.delete @url.path if @url.scheme == "unix" end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
fluq-0.8.1 | lib/fluq/input/socket.rb |
fluq-0.8.0 | lib/fluq/input/socket.rb |