require 'eventmachine' require 'msgpack' require 'ganymed/event' require 'ganymed/mongodb' require 'ganymed/websocket' module Ganymed class Processor def initialize(config) @config = config @db = MongoDB.new(config.processor.mongodb) @websocket = Websocket.new(config.processor.websocket, @db) @client = Client.new(:sampler => @config.client.sampler) listen! end def listen! @config.processor.listen.tap do |listen| log.info("processing metrics on udp##{listen.host}:#{listen.port}") EM.open_datagram_socket(listen.host, listen.port, Connection) do |connection| connection.processor = self end end end def process(data) data = MessagePack.unpack(data) case data['_type'].to_sym when :event process_event(Event.parse(data)) @client.sampler.emit(:counter, 'ganymed.processor.events', 1) when :metadata process_metadata(data) end end def process_event(event) # insert origin + namespace into metadata metadata = @db['metadata'].find_and_modify({ query: {:_id => event.origin}, update: {:$addToSet => {:namespaces => event.ns}}, upsert: true, }) # set defaults in case find_and_modify returned an empty document metadata['_id'] ||= event.origin metadata['namespaces'] ||= [] # if metadata has changed notify websocket clients if not metadata['namespaces'].include?(event.ns) metadata['namespaces'] |= [event.ns] @websocket.send(:metadata, [metadata]) end # send the event to websocket clients @websocket.each do |connection| connection.publish(event) end # skip events that are produced below the mongodb storage threshold return if (1..@config.resolution).include?(event.resolution) # insert event into ns collection @db.collection(event.ns).update({ :c => event.cf, :o => event.origin, :t => event.timestamp }, { :$set => {:v => event.value} }, :upsert => true) end def process_metadata(data) metadata = @db['metadata'].find_and_modify({ query: {:_id => data['origin']}, update: {:$set => {:data => data['data']}}, upsert: true, new: true }) @websocket.send(:metadata, [metadata]) end # @private module Connection attr_accessor :processor def receive_data(data) EM.defer do begin processor.process(data) rescue Exception => exc log.exception(exc) end end end end end end