lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.1.1 vs lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.2.0
- old
+ new
@@ -21,50 +21,63 @@
class MetricSenseOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('metricsense', self)
BACKENDS = {}
+ def self.register_backend(name, klass)
+ BACKENDS[name] = klass
+ end
+
+ module UpdateMode
+ ADD = 0
+ LATEST = 1
+ end
+
class Backend
+ UpdateMode = MetricSenseOutput::UpdateMode
include Configurable
-
def start
end
-
def shutdown
end
end
- backend_dir = "#{File.dirname(__FILE__)}/backend"
- Dir.glob("#{backend_dir}/*_backend.rb") {|e|
- require e
- }
+ module Backends
+ backend_dir = "#{File.dirname(__FILE__)}/backends"
+ require "#{backend_dir}/librato_backend"
+ require "#{backend_dir}/rdb_tsdb_backend"
+ require "#{backend_dir}/stdout_backend"
+ end
- config_param :segment_keys, :string, :default => nil
- config_param :all_segment, :bool, :default => false
config_param :value_key, :string, :default => 'value'
- config_param :backend, :string
+ config_param :no_segment_keys, :bool, :default => false
+ config_param :only_segment_keys, :string, :default => nil
+ config_param :exclude_segment_keys, :string, :default => nil
+ config_param :update_mode_key, :string, :default => 'update_mode'
+
config_param :remove_tag_prefix, :string, :default => nil
+ config_param :add_tag_prefix, :string, :default => nil
+ config_param :backend, :string
+
def configure(conf)
super
if @remove_tag_prefix
@remove_tag_prefix = Regexp.new('^' + Regexp.escape(@remove_tag_prefix) + "\\.?")
end
- if conf.has_key?('all_segment') && conf['all_segment'].empty?
- @all_segment = true
+ @no_segment_keys = (conf.has_key?('no_segment_keys') && (conf['no_segment_keys'].empty? || conf['no_segment_keys'] == 'true'))
+
+ if @only_segment_keys
+ @only_segment_keys = @only_segment_keys.strip.split(/\s*,\s*/)
end
- if @all_segment
- @segment_keys = nil
- elsif @segment_keys
- @segment_keys = @segment_keys.strip.split(/\s*,\s*/)
- else
- @segment_keys = []
+ if @exclude_segment_keys
+ @exclude_segment_keys = @exclude_segment_keys.strip.split(/\s*,\s*/)
end
be = BACKENDS[@backend]
unless be
raise ConfigError, "unknown backend: #{@backend.inspect}"
@@ -83,76 +96,138 @@
super
@backend.shutdown
end
def format_stream(tag, es)
- out = ''
+ # modify tag
tag = tag.sub(@remove_tag_prefix, '') if @remove_tag_prefix
+ tag = "#{add_tag_prefix}.#{tag}" if @add_tag_prefix
+
+ out = ''
es.each do |time,record|
- value = record[@value_key]
+ # dup record to modify
+ record = record.dup
- fv = value.to_f
+ # get value
+ value = record.delete(@value_key)
+
+ # ignore record if value is invalid or 0
+ begin
+ fv = value.to_f
+ rescue
+ next
+ end
next if fv == 0.0
+ # use integer if value.to_f == value.to_f.to_i
iv = fv.to_i
if iv.to_f == fv
value = iv
else
value = fv
end
- seg_keys = @segment_keys
- unless seg_keys
- seg_keys = record.keys
- seg_keys.delete(@value_key)
+ # get update_mode key
+ update_mode = record.delete(@update_mode_key)
+ case update_mode
+ when "latest"
+ update_mode = UpdateMode::LATEST
+ else
+ # default is add
+ update_mode = UpdateMode::ADD
end
- segs = []
- seg_keys.each {|seg_key|
- if seg_val = record[seg_key]
- segs << seg_key
- segs << seg_val
+ # get segments
+ if @no_segment_keys
+ segments = {}
+ else
+ if @only_segment_keys
+ segments = {}
+ @only_segment_keys.each {|key|
+ if v = record[key]
+ segments[key] = v
+ end
+ }
+ else
+ segments = record
end
- }
+ if @exclude_segment_keys
+ @exclude_segment_keys.each {|key|
+ segments.delete(key)
+ }
+ end
+ end
- [tag, time, value, segs].to_msgpack(out)
+ [tag, time, value, segments, update_mode].to_msgpack(out)
end
+
out
end
- class SumAggregator
+ class AddUpdater
def initialize
@value = 0
end
+ attr_reader :value
def add(value)
@value += value
end
+ def mode
+ UpdateMode::ADD
+ end
+ end
+
+ class LatestUpdater
+ def initialize
+ @value = 0
+ end
attr_reader :value
+
+ def add(value)
+ @value += value
+ end
+
+ def mode
+ UpdateMode::LATEST
+ end
end
AggregationKey = Struct.new(:tag, :time, :seg_val, :seg_key)
def write(chunk)
counters = {}
# select sum(value) from chunk group by tag, time/60, seg_val, seg_key
- chunk.msgpack_each {|tag,time,value,segs|
+ chunk.msgpack_each {|tag,time,value,segments,update_mode|
time = time / 60 * 60
+ case update_mode
+ when UpdateMode::ADD
+ updater = AddUpdater
+ when UpdateMode::LATEST
+ updater = LatestUpdater
+ else # default is AddUpdater
+ updater = AddUpdater
+ end
+
+ # total value
ak = AggregationKey.new(tag, time, nil, nil)
- (counters[ak] ||= SumAggregator.new).add(value)
+ (counters[ak] ||= updater.new).add(value)
- segs.each_slice(2) {|seg_key,seg_val|
+ # segmented values
+ segments = Hash[segments] if segments.is_a?(Array) # for backward compat
+
+ segments.each_pair {|seg_key,seg_val|
ak = AggregationKey.new(tag, time, seg_val, seg_key)
- (counters[ak] ||= SumAggregator.new).add(value)
+ (counters[ak] ||= updater.new).add(value)
}
}
data = []
- counters.each_pair {|ak,aggr|
- data << [ak.tag, ak.time, aggr.value, ak.seg_key, ak.seg_val]
+ counters.each_pair {|ak,up|
+ data << [ak.tag, ak.time, up.value, ak.seg_key, ak.seg_val, up.mode]
}
@backend.write(data)
end
end