Sha256: 1ba4447da994b4b9d3fa2b798e32411dc9752459c064c717b7303a4a94cf3008

Contents?: true

Size: 1.07 KB

Versions: 2

Compression:

Stored size: 1.07 KB

Contents

require "rubygems"
require "timeout"
require "thread"
require "socket"

module Recognizer
  class TCP
    def initialize(carbon_queue, logger, options)
      unless carbon_queue && options.is_a?(Hash)
        raise "You must provide a thread queue and options"
      end

      options[:tcp] ||= Hash.new

      threads = options[:tcp][:threads] || 20
      port    = options[:tcp][:port]    || 2003

      tcp_server      = TCPServer.new("0.0.0.0", port)
      tcp_connections = Queue.new

      Thread.abort_on_exception = true

      threads.times do
        Thread.new do
          loop do
            if connection = tcp_connections.shift
              while line = connection.gets
                line = line.strip
                if line.split("\s").count == 3
                  carbon_queue.push(line)
                end
              end
            end
          end
        end
      end

      Thread.new do
        logger.info("TCP -- Awaiting metrics with impatience ...")
        loop do
          tcp_connections.push(tcp_server.accept)
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
recognizer-0.1.6 lib/recognizer/tcp.rb
recognizer-0.1.5 lib/recognizer/tcp.rb