lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.3.1 vs lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.3.2

- old
+ new

@@ -260,33 +260,37 @@ simple_counters = {} segmented_counters = {} # select sum(value) from chunk group by tag, time/60, seg_val, seg_key chunk.msgpack_each {|tag,time,value,segments,update_mode| - time = time / @aggregate_interval * @aggregate_interval + begin + time = time / @aggregate_interval * @aggregate_interval - case update_mode - when UpdateMode::ADD - updater = AddUpdater - when UpdateMode::MAX - updater = MaxUpdater - when UpdateMode::AVERAGE # AVERAGE uses MaxUpdater and calculate average on server-side aggregation - updater = AverageUpdater - when UpdateMode::COUNT - updater = CountUpdater - else # default is AddUpdater - updater = AddUpdater - end + case update_mode + when UpdateMode::ADD + updater = AddUpdater + when UpdateMode::MAX + updater = MaxUpdater + when UpdateMode::AVERAGE # AVERAGE uses MaxUpdater and calculate average on server-side aggregation + updater = AverageUpdater + when UpdateMode::COUNT + updater = CountUpdater + else # default is AddUpdater + updater = AddUpdater + end - # simple values - ak = AggregationKey.new(tag, time, nil, nil) - (simple_counters[ak] ||= updater.new).add(value) + # simple values + ak = AggregationKey.new(tag, time, nil, nil) + (simple_counters[ak] ||= updater.new).add(value) - # segmented values - segments.each_pair {|seg_key,seg_val| - ak = AggregationKey.new(tag, time, seg_val, seg_key) - (segmented_counters[ak] ||= updater.new).add(value) - } + # segmented values + segments.each_pair {|seg_key,seg_val| + ak = AggregationKey.new(tag, time, seg_val, seg_key) + (segmented_counters[ak] ||= updater.new).add(value) + } + rescue StandardError => e + log.warn("ignoring broken chunk: #{e.inspect} - " + [tag, time, value, segments, update_mode].inspect) + end } counters = segmented_counters counters.merge!(simple_counters)