lib/fluent/plugin/out_flowcounter.rb in fluent-plugin-flowcounter-0.1.8 vs lib/fluent/plugin/out_flowcounter.rb in fluent-plugin-flowcounter-0.1.9

- old
+ new

@@ -3,10 +3,11 @@ class Fluent::FlowCounterOutput < Fluent::Output Fluent::Plugin.register_output('flowcounter', self) config_param :unit, :string, :default => 'minute' config_param :aggregate, :string, :default => 'tag' + config_param :output_style, :string, :default => 'joined' config_param :tag, :string, :default => 'flowcount' config_param :input_tag_remove_prefix, :string, :default => nil config_param :count_keys, :string include Fluent::Mixin::ConfigPlaceholders @@ -29,10 +30,19 @@ when 'tag' then :tag when 'all' then :all else raise Fluent::ConfigError, "flowcounter aggregate allows tag/all" end + @output_style = case @output_style + when 'joined' then :joined + when 'tagged' then :tagged + else + raise Fluent::ConfigError, "flowcounter output_style allows joined/tagged" + end + if @output_style == :tagged and @aggregate != :tag + raise Fluent::ConfigError, "flowcounter aggregate must be 'tag' when output_style is 'tagged'" + end if @input_tag_remove_prefix @removed_prefix_string = @input_tag_remove_prefix + '.' @removed_length = @removed_prefix_string.length end @count_keys = @count_keys.split(',') @@ -88,11 +98,31 @@ def flush(step) flushed,@counts = @counts,count_initialized(@counts.keys) generate_output(flushed, step) end + def tagged_flush(step) + flushed,@counts = @counts,count_initialized(@counts.keys) + names = flushed.keys.select {|x| x.end_with?('_count')}.map {|x| x.chomp('_count')} + names.map {|name| + counts = { + 'count' => flushed[name + '_count'], + 'bytes' => flushed[name + '_bytes'], + } + data = generate_output(counts, step) + data['tag'] = name + data + } + end + def flush_emit(step) - Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step)) + if @output_style == :tagged + tagged_flush(step).each do |data| + Fluent::Engine.emit(@tag, Fluent::Engine.now, data) + end + else + Fluent::Engine.emit(@tag, Fluent::Engine.now, flush(step)) + end end def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch))