lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.2.5 vs lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.2.6
- old
+ new
@@ -226,14 +226,24 @@
def mode
UpdateMode::AVERAGE
end
end
+ class SegmentedTotalUpdater < AddUpdater
+ def initialize(original_mode)
+ super()
+ @mode = original_mode
+ end
+
+ attr_reader :mode
+ end
+
AggregationKey = Struct.new(:tag, :time, :seg_val, :seg_key)
def write(chunk)
- counters = {}
+ simple_counters = {}
+ segmented_counters = {}
# select sum(value) from chunk group by tag, time/60, seg_val, seg_key
chunk.msgpack_each {|tag,time,value,segments,update_mode|
time = time / @aggregate_interval * @aggregate_interval
@@ -246,24 +256,37 @@
updater = AverageUpdater
else # default is AddUpdater
updater = AddUpdater
end
- # total value
- ak = AggregationKey.new(tag, time, nil, nil)
- (counters[ak] ||= updater.new).add(value)
+ if segments.empty?
+ # simple values
+ ak = AggregationKey.new(tag, time, nil, nil)
+ (simple_counters[ak] ||= updater.new).add(value)
+ else
+ # segmented values
+ segments.each_pair {|seg_key,seg_val|
+ ak = AggregationKey.new(tag, time, seg_val, seg_key)
+ (segmented_counters[ak] ||= updater.new).add(value)
+ }
+ end
+ }
- # segmented values
- segments = Hash[segments] if segments.is_a?(Array) # for backward compat
-
- segments.each_pair {|seg_key,seg_val|
- ak = AggregationKey.new(tag, time, seg_val, seg_key)
- (counters[ak] ||= updater.new).add(value)
- }
+ # calculate total value of segmented values
+ segmented_totals = {}
+ segmented_counters.each_pair {|ak,up|
+ ak = AggregationKey.new(ak.tag, ak.time, nil, nil)
+ (segmented_totals[ak] ||= SegmentedTotalUpdater.new(up.mode)).add(up.value)
}
+ # simple_counters have higher priority than segmented_totals
+ counters = segmented_totals
+ counters.merge!(segmented_counters)
+ counters.merge!(simple_counters)
+
data = []
counters.each_pair {|ak,up|
+ p [ak.tag, ak.time, up.normalized_value(@normalize_factor), ak.seg_key, ak.seg_val, up.mode]
data << [ak.tag, ak.time, up.normalized_value(@normalize_factor), ak.seg_key, ak.seg_val, up.mode]
}
@backend.write(data)
end