Sha256: 5ff196adec393036a657df0b0ff7fd85b08290a90dff3bc646f2b298041cb916

Contents?: true

Size: 1.04 KB

Versions: 5

Compression:

Stored size: 1.04 KB

Contents

require "rubygems"
require "thread"
require "bunny"

module Recognizer
  class AMQP
    def initialize(thread_queue, options)
      unless thread_queue && options.is_a?(Hash)
        raise "You must provide a thread queue and options"
      end
      amqp = Bunny.new(options[:amqp])
      amqp.start
      queue = amqp.queue("recognizer")
      exchange = amqp.exchange("graphite", :type => :topic, :durable => true)
      queue.bind(exchange, :key => "*")
      Thread.abort_on_exception = true
      consumer = Thread.new do
        puts "Awaiting the metrics with impatience ..."
        queue.subscribe do |message|
          payload = message[:payload]
          routing_key = message[:routing_key]
          lines = payload.split("\n")
          lines.each do |line|
            line = line.strip
            case line.split(" ").count
            when 3
              thread_queue.push(line)
            when 2
              thread_queue.push("#{routing_key} #{line}")
            end
          end
        end
      end
      consumer.join
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
recognizer-0.0.7 lib/recognizer/amqp.rb
recognizer-0.0.6 lib/recognizer/amqp.rb
recognizer-0.0.4 lib/recognizer/amqp.rb
recognizer-0.0.3 lib/recognizer/amqp.rb
recognizer-0.0.2 lib/recognizer/amqp.rb