lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.3.1 vs lib/fluent/plugin/out_metricsense.rb in fluent-plugin-metricsense-0.3.2
- old
+ new
@@ -260,33 +260,37 @@
simple_counters = {}
segmented_counters = {}
# select sum(value) from chunk group by tag, time/60, seg_val, seg_key
chunk.msgpack_each {|tag,time,value,segments,update_mode|
- time = time / @aggregate_interval * @aggregate_interval
+ begin
+ time = time / @aggregate_interval * @aggregate_interval
- case update_mode
- when UpdateMode::ADD
- updater = AddUpdater
- when UpdateMode::MAX
- updater = MaxUpdater
- when UpdateMode::AVERAGE # AVERAGE uses MaxUpdater and calculate average on server-side aggregation
- updater = AverageUpdater
- when UpdateMode::COUNT
- updater = CountUpdater
- else # default is AddUpdater
- updater = AddUpdater
- end
+ case update_mode
+ when UpdateMode::ADD
+ updater = AddUpdater
+ when UpdateMode::MAX
+ updater = MaxUpdater
+ when UpdateMode::AVERAGE # AVERAGE uses MaxUpdater and calculate average on server-side aggregation
+ updater = AverageUpdater
+ when UpdateMode::COUNT
+ updater = CountUpdater
+ else # default is AddUpdater
+ updater = AddUpdater
+ end
- # simple values
- ak = AggregationKey.new(tag, time, nil, nil)
- (simple_counters[ak] ||= updater.new).add(value)
+ # simple values
+ ak = AggregationKey.new(tag, time, nil, nil)
+ (simple_counters[ak] ||= updater.new).add(value)
- # segmented values
- segments.each_pair {|seg_key,seg_val|
- ak = AggregationKey.new(tag, time, seg_val, seg_key)
- (segmented_counters[ak] ||= updater.new).add(value)
- }
+ # segmented values
+ segments.each_pair {|seg_key,seg_val|
+ ak = AggregationKey.new(tag, time, seg_val, seg_key)
+ (segmented_counters[ak] ||= updater.new).add(value)
+ }
+ rescue StandardError => e
+ log.warn("ignoring broken chunk: #{e.inspect} - " + [tag, time, value, segments, update_mode].inspect)
+ end
}
counters = segmented_counters
counters.merge!(simple_counters)