lib/fluent/plugin/out_flowcounter.rb in fluent-plugin-flowcounter-0.1.8 vs lib/fluent/plugin/out_flowcounter.rb in fluent-plugin-flowcounter-0.1.9
- old
+ new
@@ -3,10 +3,11 @@
class Fluent::FlowCounterOutput < Fluent::Output
Fluent::Plugin.register_output('flowcounter', self)
config_param :unit, :string, :default => 'minute'
config_param :aggregate, :string, :default => 'tag'
+ config_param :output_style, :string, :default => 'joined'
config_param :tag, :string, :default => 'flowcount'
config_param :input_tag_remove_prefix, :string, :default => nil
config_param :count_keys, :string
include Fluent::Mixin::ConfigPlaceholders
@@ -29,10 +30,19 @@
when 'tag' then :tag
when 'all' then :all
else
raise Fluent::ConfigError, "flowcounter aggregate allows tag/all"
end
+ @output_style = case @output_style
+ when 'joined' then :joined
+ when 'tagged' then :tagged
+ else
+ raise Fluent::ConfigError, "flowcounter output_style allows joined/tagged"
+ end
+ if @output_style == :tagged and @aggregate != :tag
+ raise Fluent::ConfigError, "flowcounter aggregate must be 'tag' when output_style is 'tagged'"
+ end
if @input_tag_remove_prefix
@removed_prefix_string = @input_tag_remove_prefix + '.'
@removed_length = @removed_prefix_string.length
end
@count_keys = @count_keys.split(',')
@@ -88,11 +98,31 @@
def flush(step)
flushed,@counts = @counts,count_initialized(@counts.keys)
generate_output(flushed, step)
end
+ def tagged_flush(step)
+ flushed,@counts = @counts,count_initialized(@counts.keys)
+ names = flushed.keys.select {|x| x.end_with?('_count')}.map {|x| x.chomp('_count')}
+ names.map {|name|
+ counts = {
+ 'count' => flushed[name + '_count'],
+ 'bytes' => flushed[name + '_bytes'],
+ }
+ data = generate_output(counts, step)
+ data['tag'] = name
+ data
+ }
+ end
+
def flush_emit(step)
- Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step))
+ if @output_style == :tagged
+ tagged_flush(step).each do |data|
+ Fluent::Engine.emit(@tag, Fluent::Engine.now, data)
+ end
+ else
+ Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step))
+ end
end
def start_watch
# for internal, or tests only
@watcher = Thread.new(&method(:watch))