lib/recognizer/amqp.rb in recognizer-0.0.10 vs lib/recognizer/amqp.rb in recognizer-0.1.0
- old
+ new
@@ -2,48 +2,50 @@
require "thread"
require "bunny"
module Recognizer
class AMQP
- def initialize(thread_queue, options)
- unless thread_queue && options.is_a?(Hash)
+ def initialize(carbon_queue, logger, options)
+ unless carbon_queue && options.is_a?(Hash)
raise "You must provide a thread queue and options"
end
- options[:amqp] ||= Hash.new
- options[:amqp][:exchange] ||= Hash.new
+ if options.has_key?(:amqp)
+ options[:amqp][:exchange] ||= Hash.new
- exchange_name = options[:amqp][:exchange][:name] || "graphite"
- durable = options[:amqp][:exchange][:durable] || false
- routing_key = options[:amqp][:exchange][:routing_key] || "#"
- exchange_type = options[:amqp][:exchange][:type] || :topic
+ exchange_name = options[:amqp][:exchange][:name] || "graphite"
+ durable = options[:amqp][:exchange][:durable] || false
+ routing_key = options[:amqp][:exchange][:routing_key] || "#"
+ exchange_type = options[:amqp][:exchange][:type] || :topic
- amqp = Bunny.new(options[:amqp].reject { |key, value| key == :exchange })
- amqp.start
+ amqp = Bunny.new(options[:amqp].reject { |key, value| key == :exchange })
+ amqp.start
- exchange = amqp.exchange(exchange_name, :type => exchange_type.to_sym, :durable => durable)
- queue = amqp.queue("recognizer")
- queue.bind(exchange, :key => routing_key)
+ exchange = amqp.exchange(exchange_name, :type => exchange_type.to_sym, :durable => durable)
+ queue = amqp.queue("recognizer")
+ queue.bind(exchange, :key => routing_key)
- Thread.abort_on_exception = true
+ Thread.abort_on_exception = true
- consumer = Thread.new do
- puts "Awaiting the metrics with impatience ..."
- queue.subscribe do |message|
- payload = message[:payload]
- msg_routing_key = message[:routing_key] || message[:delivery_details][:routing_key]
- lines = payload.split("\n")
- lines.each do |line|
- line = line.strip
- case line.split("\s").count
- when 3
- thread_queue.push(line)
- when 2
- thread_queue.push("#{msg_routing_key} #{line}")
+ Thread.new do
+ logger.info("AMQP -- Awaiting metrics with impatience ...")
+ queue.subscribe do |message|
+ payload = message[:payload]
+ msg_routing_key = message[:routing_key] || message[:delivery_details][:routing_key]
+ lines = payload.split("\n")
+ lines.each do |line|
+ line = line.strip
+ case line.split("\s").count
+ when 3
+ carbon_queue.push(line)
+ when 2
+ carbon_queue.push("#{msg_routing_key} #{line}")
+ end
end
end
end
+ else
+ logger.warn("AMQP -- Not configured")
end
- consumer.join
end
end
end