Sha256: 58d2cfd6e28c356b4bab3169096013afac6c41bb74a12d4b08192190ce569540

Contents?: true

Size: 1.34 KB

Versions: 2

Compression:

Stored size: 1.34 KB

Contents

require "rubygems"
require "timeout"
require "thread"
require "socket"

module Recognizer
  class TCP
    def initialize(carbon_queue, logger, options)
      unless carbon_queue && options.is_a?(Hash)
        raise "You must provide a thread queue and options"
      end

      options[:tcp] ||= Hash.new

      threads = options[:tcp][:threads] || 20
      port    = options[:tcp][:port]    || 2003

      tcp_server      = TCPServer.new("0.0.0.0", port)
      tcp_connections = Queue.new

      Thread.abort_on_exception = true

      threads.times do
        Thread.new do
          loop do
            if connection = tcp_connections.shift
              begin
                lines = timeout(12) do
                  connection.gets.split("\n")
                end
                lines.each do |line|
                  line = line.strip
                  if line.split("\s").count == 3
                    carbon_queue.push(line)
                  end
                end
              rescue Timeout::Error
                logger.warn("TCP -- A connection has timed out")
                connection.close
              end
            end
          end
        end
      end

      Thread.new do
        logger.info("TCP -- Awaiting metrics with impatience ...")
        loop do
          tcp_connections.push(tcp_server.accept)
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
recognizer-0.1.4 lib/recognizer/tcp.rb
recognizer-0.1.3 lib/recognizer/tcp.rb