lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.2 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.3

- old
+ new

@@ -1,10 +1,14 @@ module Fluent class ElapsedTimeOutput < MultiOutput Plugin.register_output('elapsed_time', self) config_param :tag, :string, :default => 'elapsed' + config_param :add_tag_prefix, :string, :default => nil + config_param :remove_tag_prefix, :string, :default => nil + config_param :remove_tag_slice, :string, :default => nil + config_param :aggregate, :string, :default => 'all' config_param :interval, :time, :default => 60 config_param :each, :default => :es do |val| case val.downcase when 'es' :es @@ -16,15 +20,20 @@ end def initialize super @outputs = [] - @elapsed = [] + @elapsed = {} end - attr_reader :outputs, :elapsed + # for test + attr_reader :outputs + def elapsed(tag = :all) + @elapsed[tag] ||= [] + end + def configure(conf) super conf.elements.select {|e| e.name == 'store' }.each {|e| @@ -37,28 +46,74 @@ output = Plugin.new_output(type) output.configure(e) @outputs << output } + case @aggregate + when 'all' + raise ConfigError, "out_elapsed_time: `tag` must be specified with aggregate all" if @tag.nil? + when 'tag' + raise ConfigError, "out_elapsed_time: `add_tag_prefix` or `remove_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? and @remove_tag_prefix.nil? + else + raise ConfigError, "out_elapsed_time: aggregate allows `tag` or `all`" + end + + @tag_slice_proc = + if @remove_tag_slice + lindex, rindex = @remove_tag_slice.split('..', 2) + if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ + raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]" + end + l, r = lindex.to_i, rindex.to_i + Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') } + else + Proc.new {|tag| tag } + end + + @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix + @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix + @tag_proc = + if @tag_prefix and @tag_prefix_match + Proc.new {|tag| "#{@tag_prefix}#{lstrip(@tag_slice_proc.call(tag), @tag_prefix_match)}" } + elsif @tag_prefix_match + Proc.new {|tag| lstrip(@tag_slice_proc.call(tag), @tag_prefix_match) } + elsif @tag_prefix + Proc.new {|tag| "#{@tag_prefix}#{@tag_slice_proc.call(tag)}" } + elsif @tag + Proc.new {|tag| @tag } + else + Proc.new {|tag| @tag_slice_proc.call(tag) } + end + + @push_elapsed_proc = + case @aggregate + when 'all' + Proc.new {|tag, elapsed_time| elapsed(:all) << elapsed_time } + when 'tag' + Proc.new {|tag, elapsed_time| elapsed(tag) << elapsed_time } + end + @emit_proc = if @each == :message chain = NullOutputChain.instance Proc.new {|tag, es| start = Time.now es.each do |time, record| @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) } finish = Time.now - @elapsed << (finish - start).to_f + elapsed = (finish - start).to_f + @push_elapsed_proc.call(@tag_proc.call(tag), elapsed) start = finish end } else chain = NullOutputChain.instance Proc.new {|tag, es| t = Time.now @outputs.each {|output| output.emit(tag, es, chain) } - @elapsed << (Time.now - t).to_f + elapsed = (Time.now - t).to_f + @push_elapsed_proc.call(@tag_proc.call(tag), elapsed) } end end def start @@ -86,20 +141,26 @@ end end end def flush_emit - elapsed, @elapsed = @elapsed, [] - unless elapsed.empty? - max = elapsed.max - num = elapsed.size - avg = elapsed.map(&:to_f).inject(:+) / num.to_f - Engine.emit(@tag, Engine.now, {"max" => max, "avg" => avg, "num" => num}) + elapseds, @elapsed = @elapsed, {} + elapseds.each do |tag, elapsed| + unless elapsed.empty? + max = elapsed.max + num = elapsed.size + avg = elapsed.map(&:to_f).inject(:+) / num.to_f + Engine.emit(tag, Engine.now, {"max" => max, "avg" => avg, "num" => num}) + end end end def emit(tag, es, chain) @emit_proc.call(tag, es) chain.next + end + + def lstrip(string, substring) + string.index(substring) == 0 ? string[substring.size..-1] : string end end end