Sha256: f5f9db4a647465c2135d4883d1c6b668066c775d8631fd45f8265edbe5ec76c0
Contents?: true
Size: 1.15 KB
Versions: 1
Compression:
Stored size: 1.15 KB
Contents
require "rubygems" require "thread" require "bunny" module Recognizer class AMQP def initialize(thread_queue, options) unless thread_queue && options.is_a?(Hash) raise "You must provide a thread queue and options" end amqp = Bunny.new(options[:amqp]) amqp.start queue = amqp.queue("recognizer") exchange = amqp.exchange("graphite", :type => :topic, :durable => true) queue.bind(exchange, :key => "*") Thread.abort_on_exception = true consumer = Thread.new do puts "Awaiting the metrics with impatience ..." queue.subscribe do |message| payload = message[:payload] begin metrics = JSON.parse(payload) if metrics.is_a?(Array) metrics.each do |metric| if metric.split(" ").count == 3 thread_queue.push(metric) end end end rescue JSON::ParserError metric = payload if metric.split(" ").count == 3 thread_queue.push(metric) end end end end consumer.join end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
recognizer-0.0.1 | lib/recognizer/amqp.rb |