Sha256: f5f9db4a647465c2135d4883d1c6b668066c775d8631fd45f8265edbe5ec76c0

Contents?: true

Size: 1.15 KB

Versions: 1

Compression:

Stored size: 1.15 KB

Contents

require "rubygems"
require "thread"
require "bunny"

module Recognizer
  class AMQP
    def initialize(thread_queue, options)
      unless thread_queue && options.is_a?(Hash)
        raise "You must provide a thread queue and options"
      end
      amqp = Bunny.new(options[:amqp])
      amqp.start
      queue = amqp.queue("recognizer")
      exchange = amqp.exchange("graphite", :type => :topic, :durable => true)
      queue.bind(exchange, :key => "*")
      Thread.abort_on_exception = true
      consumer = Thread.new do
        puts "Awaiting the metrics with impatience ..."
        queue.subscribe do |message|
          payload = message[:payload]
          begin
            metrics = JSON.parse(payload)
            if metrics.is_a?(Array)
              metrics.each do |metric|
                if metric.split(" ").count == 3
                  thread_queue.push(metric)
                end
              end
            end
          rescue JSON::ParserError
            metric = payload
            if metric.split(" ").count == 3
              thread_queue.push(metric)
            end
          end
        end
      end
      consumer.join
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
recognizer-0.0.1 lib/recognizer/amqp.rb