lib/recognizer/amqp.rb in recognizer-0.0.7 vs lib/recognizer/amqp.rb in recognizer-0.0.8
- old
+ new
@@ -6,28 +6,40 @@
class AMQP
def initialize(thread_queue, options)
unless thread_queue && options.is_a?(Hash)
raise "You must provide a thread queue and options"
end
- amqp = Bunny.new(options[:amqp])
+
+ options[:amqp] ||= Hash.new
+ options[:amqp][:exchange] ||= Hash.new
+
+ exchange_name = options[:amqp][:exchange][:name] || "graphite"
+ durable = options[:amqp][:exchange][:durable] || false
+ routing_key = options[:amqp][:exchange][:routing_key] || "#"
+ exchange_type = options[:amqp][:exchange][:type] || :topic
+
+ amqp = Bunny.new(options[:amqp].reject { |key, value| key == :exchange })
amqp.start
+
+ exchange = amqp.exchange(exchange_name, :type => exchange_type.to_sym, :durable => durable)
queue = amqp.queue("recognizer")
- exchange = amqp.exchange("graphite", :type => :topic, :durable => true)
- queue.bind(exchange, :key => "*")
+ queue.bind(exchange, :key => routing_key)
+
Thread.abort_on_exception = true
+
consumer = Thread.new do
puts "Awaiting the metrics with impatience ..."
queue.subscribe do |message|
- payload = message[:payload]
- routing_key = message[:routing_key]
+ payload = message[:payload]
+ msg_routing_key = message[:routing_key] || message[:delivery_details][:routing_key]
lines = payload.split("\n")
lines.each do |line|
line = line.strip
- case line.split(" ").count
+ case line.split("\s").count
when 3
thread_queue.push(line)
when 2
- thread_queue.push("#{routing_key} #{line}")
+ thread_queue.push("#{msg_routing_key} #{line}")
end
end
end
end
consumer.join