lib/mouth/sucker.rb in mouth-0.8.1 vs lib/mouth/sucker.rb in mouth-0.8.2

- old
+ new

@@ -18,49 +18,60 @@ # Host/Port to suck UDP packets on attr_accessor :host attr_accessor :port # Actual EM::Mongo connection - attr_accessor :mongo_db + attr_accessor :mongo # Info to connect to mongo attr_accessor :mongo_db_name - attr_accessor :mongo_hosts + attr_accessor :mongo_hostports # Accumulators of our data attr_accessor :counters attr_accessor :timers + attr_accessor :gauges # Stats attr_accessor :udp_packets_received attr_accessor :mongo_flushes def initialize(options = {}) self.host = options[:host] || "localhost" self.port = options[:port] || 8889 self.mongo_db_name = options[:mongo_db_name] || "mouth" - self.mongo_hosts = options[:mongo_hosts] || ["localhost"] + hostports = options[:mongo_hostports] || [["localhost", EM::Mongo::DEFAULT_PORT]] + self.mongo_hostports = hostports.collect do |hp| + if hp.is_a?(String) + host, port = hp.split(":") + [host, port || EM::Mongo::DEFAULT_PORT] + else + hp + end + end self.udp_packets_received = 0 self.mongo_flushes = 0 self.counters = {} self.timers = {} + self.gauges = {} end def suck! EM.run do # Connect to mongo now - self.mongo_db + self.mongo EM.open_datagram_socket host, port, SuckerConnection do |conn| conn.sucker = self end EM.add_periodic_timer(10) do Mouth.logger.info "Counters: #{self.counters.inspect}" Mouth.logger.info "Timers: #{self.timers.inspect}" + Mouth.logger.info "Gauges: #{self.gauges.inspect}" self.flush! self.set_procline! end EM.next_tick do @@ -71,22 +82,22 @@ end # counter: gorets:1|c # counter w/ sampling: gorets:1|c|@0.1 # timer: glork:320|ms - # (future) gauge: gaugor:333|g + # gauge: gaugor:333|g def store!(data) key_value, command_sampling = data.to_s.split("|", 2) key, value = key_value.to_s.split(":") command, sampling = command_sampling.to_s.split("|") return unless key && value && command && key.length > 0 && value.length > 0 && command.length > 0 key = Mouth.parse_key(key).join(".") value = value.to_f - ts = minute_timestamps + ts = Mouth.current_timestamp if command == "ms" self.timers[ts] ||= {} self.timers[ts][key] ||= [] self.timers[ts][key] << value @@ -97,17 +108,20 @@ factor = (factor == 0.0 || factor > 1.0) ? 1.0 : 1.0 / factor end self.counters[ts] ||= {} self.counters[ts][key] ||= 0.0 self.counters[ts][key] += value * factor + elsif command == "g" + self.gauges[ts] ||= {} + self.gauges[ts][key] = value end self.udp_packets_received += 1 end def flush! - ts = minute_timestamps + ts = Mouth.current_timestamp limit_ts = ts - 1 mongo_docs = {} # We're going to construct mongo_docs which look like this: # "mycollections:234234": { # NOTE: this timpstamp will be popped into .t = 234234 @@ -115,11 +129,12 @@ # happenings: 37, # affairs: 3 # }, # m: { # occasions: {...} - # } + # }, + # g: {things: 3} # } self.counters.each do |cur_ts, counters_to_save| if cur_ts <= limit_ts counters_to_save.each do |counter_key, value| @@ -134,10 +149,26 @@ self.counters.delete(cur_ts) end end + self.gauges.each do |cur_ts, gauges_to_save| + if cur_ts <= limit_ts + gauges_to_save.each do |gauge_key, value| + ns, sub_key = Mouth.parse_key(gauge_key) + mongo_key = "#{ns}:#{ts}" + mongo_docs[mongo_key] ||= {} + + cur_mongo_doc = mongo_docs[mongo_key] + cur_mongo_doc["g"] ||= {} + cur_mongo_doc["g"][sub_key] = value + end + + self.gauges.delete(cur_ts) + end + end + self.timers.each do |cur_ts, timers_to_save| if cur_ts <= limit_ts timers_to_save.each do |timer_key, values| ns, sub_key = Mouth.parse_key(timer_key) mongo_key = "#{ns}:#{ts}" @@ -158,37 +189,33 @@ def save_documents!(mongo_docs) Mouth.logger.info "Saving Docs: #{mongo_docs.inspect}" mongo_docs.each do |key, doc| ns, ts = key.split(":") - collection_name = "mouth_#{ns}" + collection_name = Mouth.mongo_collection_name(ns) doc["t"] = ts.to_i - self.mongo_db.collection(collection_name).insert(doc) + self.mongo.collection(collection_name).insert(doc) end self.mongo_flushes += 1 if mongo_docs.any? end - def mongo_db - @mongo_db ||= begin - if self.mongo_hosts.length == 1 - EM::Mongo::Connection.new(self.mongo_hosts.first).db(self.mongo_db_name) + def mongo + @mongo ||= begin + if self.mongo_hostports.length == 1 + EM::Mongo::Connection.new(*self.mongo_hostports.first).db(self.mongo_db_name) else - raise "TODO: ability to connect to a replica set." + raise "Ability to connect to a replica set not implemented." end end end def set_procline! $0 = "mouth [started] [UDP Recv: #{self.udp_packets_received}] [Mongo saves: #{self.mongo_flushes}]" end private - - def minute_timestamps - Time.now.to_i / 60 - end def analyze_timer(values) values.sort! count = values.length