Sha256: 168282aadd2b62254850950a8425d65688f8320abc763c1897e8214c45ff1213

Contents?: true

Size: 1.64 KB

Versions: 3

Compression:

Stored size: 1.64 KB

Contents

require "rubygems"
require "thread"
require "bunny"

module Recognizer
  class AMQP
    def initialize(carbon_queue, logger, options)
      unless carbon_queue && options.is_a?(Hash)
        raise "You must provide a thread queue and options"
      end

      if options.has_key?(:amqp)
        options[:amqp][:exchange] ||= Hash.new

        exchange_name = options[:amqp][:exchange][:name]        || "graphite"
        durable       = options[:amqp][:exchange][:durable]     || false
        routing_key   = options[:amqp][:exchange][:routing_key] || "#"
        exchange_type = options[:amqp][:exchange][:type]        || :topic

        amqp = Bunny.new(options[:amqp].reject { |key, value| key == :exchange })
        amqp.start

        exchange = amqp.exchange(exchange_name, :type => exchange_type.to_sym, :durable => durable)
        queue = amqp.queue("recognizer")
        queue.bind(exchange, :key => routing_key)

        Thread.abort_on_exception = true

        Thread.new do
          logger.info("AMQP -- Awaiting metrics with impatience ...")
          queue.subscribe do |message|
            payload         = message[:payload]
            msg_routing_key = message[:routing_key] || message[:delivery_details][:routing_key]
            lines = payload.split("\n")
            lines.each do |line|
              line = line.strip
              case line.split("\s").count
              when 3
                carbon_queue.push(line)
              when 2
                carbon_queue.push("#{msg_routing_key} #{line}")
              end
            end
          end
        end
      else
        logger.warn("AMQP -- Not configured")
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
recognizer-0.1.2 lib/recognizer/amqp.rb
recognizer-0.1.1 lib/recognizer/amqp.rb
recognizer-0.1.0 lib/recognizer/amqp.rb