Sha256: 757a7d0afdd9f5c16b9030c9e79d93de1b8116d645e11fcd9df55bc28fa2455c

Contents?: true

Size: 1.69 KB

Versions: 2

Compression:

Stored size: 1.69 KB

Contents

class FluQ::Input::Socket < FluQ::Input::Base
  MAXLEN = 16 * 1024

  # @attr_reader [URI] url the URL
  attr_reader :url

  # Constructor.
  # @option options [String] :bind the URL to bind to
  # @raises [ArgumentError] when no bind URL provided
  # @raises [URI::InvalidURIError] if invalid URL is given
  # @example Launch a server
  #
  #   server = FluQ::Server.new(runner, bind: "tcp://localhost:7654")
  #
  def initialize(*)
    super
  end

  # @return [String] short name
  def name
    @url ? @url.scheme : super
  end

  # @return [String] descriptive name
  def description
    "socket (#{@url})"
  end

  # Start the server
  def run
    super

    @server = case @url.scheme
    when 'tcp'
      TCPServer.new(@url.host, @url.port)
    when 'udp'
      UDPSocket.new.tap {|s| s.bind(@url.host, @url.port) }
    when 'unix'
      UNIXServer.open(@url.path)
    end

    case @url.scheme
    when 'udp'
      loop { process @server.recvfrom(MAXLEN)[0] }
    else
      loop { async.handle_connection @server.accept }
    end
  end

  # @return [Boolean] true when listening
  def listening?
    !!@server
  end

  protected

    # @abstract callback for configuration initialization
    def configure
      raise ArgumentError, 'No URL to bind to provided, make sure you pass :bind option' unless config[:bind]
      @url = FluQ::URL.parse config[:bind], %w|tcp udp unix|
    end

    # Handle a single connection
    def handle_connection(socket)
      loop do
        process socket.readpartial(MAXLEN)
      end
    rescue EOFError
    ensure
      socket.close
    end

    def before_terminate
      return unless @server

      @server.close
      File.delete @url.path if @url.scheme == "unix"
    end

end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluq-0.8.1 lib/fluq/input/socket.rb
fluq-0.8.0 lib/fluq/input/socket.rb