lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.2.5 vs lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.2.6

- old
+ new

@@ -226,14 +226,24 @@ def mode UpdateMode::AVERAGE end end + class SegmentedTotalUpdater < AddUpdater + def initialize(original_mode) + super() + @mode = original_mode + end + + attr_reader :mode + end + AggregationKey = Struct.new(:tag, :time, :seg_val, :seg_key) def write(chunk) - counters = {} + simple_counters = {} + segmented_counters = {} # select sum(value) from chunk group by tag, time/60, seg_val, seg_key chunk.msgpack_each {|tag,time,value,segments,update_mode| time = time / @aggregate_interval * @aggregate_interval @@ -246,24 +256,37 @@ updater = AverageUpdater else # default is AddUpdater updater = AddUpdater end - # total value - ak = AggregationKey.new(tag, time, nil, nil) - (counters[ak] ||= updater.new).add(value) + if segments.empty? + # simple values + ak = AggregationKey.new(tag, time, nil, nil) + (simple_counters[ak] ||= updater.new).add(value) + else + # segmented values + segments.each_pair {|seg_key,seg_val| + ak = AggregationKey.new(tag, time, seg_val, seg_key) + (segmented_counters[ak] ||= updater.new).add(value) + } + end + } - # segmented values - segments = Hash[segments] if segments.is_a?(Array) # for backward compat - - segments.each_pair {|seg_key,seg_val| - ak = AggregationKey.new(tag, time, seg_val, seg_key) - (counters[ak] ||= updater.new).add(value) - } + # calculate total value of segmented values + segmented_totals = {} + segmented_counters.each_pair {|ak,up| + ak = AggregationKey.new(ak.tag, ak.time, nil, nil) + (segmented_totals[ak] ||= SegmentedTotalUpdater.new(up.mode)).add(up.value) } + # simple_counters have higher priority than segmented_totals + counters = segmented_totals + counters.merge!(segmented_counters) + counters.merge!(simple_counters) + data = [] counters.each_pair {|ak,up| + p [ak.tag, ak.time, up.normalized_value(@normalize_factor), ak.seg_key, ak.seg_val, up.mode] data << [ak.tag, ak.time, up.normalized_value(@normalize_factor), ak.seg_key, ak.seg_val, up.mode] } @backend.write(data) end