lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.1.1 vs lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.2.0

- old
+ new

@@ -21,50 +21,63 @@ class MetricSenseOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('metricsense', self) BACKENDS = {} + def self.register_backend(name, klass) + BACKENDS[name] = klass + end + + module UpdateMode + ADD = 0 + LATEST = 1 + end + class Backend + UpdateMode = MetricSenseOutput::UpdateMode include Configurable - def start end - def shutdown end end - backend_dir = "#{File.dirname(__FILE__)}/backend" - Dir.glob("#{backend_dir}/*_backend.rb") {|e| - require e - } + module Backends + backend_dir = "#{File.dirname(__FILE__)}/backends" + require "#{backend_dir}/librato_backend" + require "#{backend_dir}/rdb_tsdb_backend" + require "#{backend_dir}/stdout_backend" + end - config_param :segment_keys, :string, :default => nil - config_param :all_segment, :bool, :default => false config_param :value_key, :string, :default => 'value' - config_param :backend, :string + config_param :no_segment_keys, :bool, :default => false + config_param :only_segment_keys, :string, :default => nil + config_param :exclude_segment_keys, :string, :default => nil + config_param :update_mode_key, :string, :default => 'update_mode' + config_param :remove_tag_prefix, :string, :default => nil + config_param :add_tag_prefix, :string, :default => nil + config_param :backend, :string + def configure(conf) super if @remove_tag_prefix @remove_tag_prefix = Regexp.new('^' + Regexp.escape(@remove_tag_prefix) + "\\.?") end - if conf.has_key?('all_segment') && conf['all_segment'].empty? - @all_segment = true + @no_segment_keys = (conf.has_key?('no_segment_keys') && (conf['no_segment_keys'].empty? || conf['no_segment_keys'] == 'true')) + + if @only_segment_keys + @only_segment_keys = @only_segment_keys.strip.split(/\s*,\s*/) end - if @all_segment - @segment_keys = nil - elsif @segment_keys - @segment_keys = @segment_keys.strip.split(/\s*,\s*/) - else - @segment_keys = [] + if @exclude_segment_keys + @exclude_segment_keys = @exclude_segment_keys.strip.split(/\s*,\s*/) end be = BACKENDS[@backend] unless be raise ConfigError, "unknown backend: #{@backend.inspect}" @@ -83,76 +96,138 @@ super @backend.shutdown end def format_stream(tag, es) - out = '' + # modify tag tag = tag.sub(@remove_tag_prefix, '') if @remove_tag_prefix + tag = "#{add_tag_prefix}.#{tag}" if @add_tag_prefix + + out = '' es.each do |time,record| - value = record[@value_key] + # dup record to modify + record = record.dup - fv = value.to_f + # get value + value = record.delete(@value_key) + + # ignore record if value is invalid or 0 + begin + fv = value.to_f + rescue + next + end next if fv == 0.0 + # use integer if value.to_f == value.to_f.to_i iv = fv.to_i if iv.to_f == fv value = iv else value = fv end - seg_keys = @segment_keys - unless seg_keys - seg_keys = record.keys - seg_keys.delete(@value_key) + # get update_mode key + update_mode = record.delete(@update_mode_key) + case update_mode + when "latest" + update_mode = UpdateMode::LATEST + else + # default is add + update_mode = UpdateMode::ADD end - segs = [] - seg_keys.each {|seg_key| - if seg_val = record[seg_key] - segs << seg_key - segs << seg_val + # get segments + if @no_segment_keys + segments = {} + else + if @only_segment_keys + segments = {} + @only_segment_keys.each {|key| + if v = record[key] + segments[key] = v + end + } + else + segments = record end - } + if @exclude_segment_keys + @exclude_segment_keys.each {|key| + segments.delete(key) + } + end + end - [tag, time, value, segs].to_msgpack(out) + [tag, time, value, segments, update_mode].to_msgpack(out) end + out end - class SumAggregator + class AddUpdater def initialize @value = 0 end + attr_reader :value def add(value) @value += value end + def mode + UpdateMode::ADD + end + end + + class LatestUpdater + def initialize + @value = 0 + end attr_reader :value + + def add(value) + @value += value + end + + def mode + UpdateMode::LATEST + end end AggregationKey = Struct.new(:tag, :time, :seg_val, :seg_key) def write(chunk) counters = {} # select sum(value) from chunk group by tag, time/60, seg_val, seg_key - chunk.msgpack_each {|tag,time,value,segs| + chunk.msgpack_each {|tag,time,value,segments,update_mode| time = time / 60 * 60 + case update_mode + when UpdateMode::ADD + updater = AddUpdater + when UpdateMode::LATEST + updater = LatestUpdater + else # default is AddUpdater + updater = AddUpdater + end + + # total value ak = AggregationKey.new(tag, time, nil, nil) - (counters[ak] ||= SumAggregator.new).add(value) + (counters[ak] ||= updater.new).add(value) - segs.each_slice(2) {|seg_key,seg_val| + # segmented values + segments = Hash[segments] if segments.is_a?(Array) # for backward compat + + segments.each_pair {|seg_key,seg_val| ak = AggregationKey.new(tag, time, seg_val, seg_key) - (counters[ak] ||= SumAggregator.new).add(value) + (counters[ak] ||= updater.new).add(value) } } data = [] - counters.each_pair {|ak,aggr| - data << [ak.tag, ak.time, aggr.value, ak.seg_key, ak.seg_val] + counters.each_pair {|ak,up| + data << [ak.tag, ak.time, up.value, ak.seg_key, ak.seg_val, up.mode] } @backend.write(data) end end