lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.5 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.6

- old
+ new

@@ -61,61 +61,45 @@ raise ConfigError, "out_elapsed_time: `add_tag_prefix` or `remove_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? and @remove_tag_prefix.nil? else raise ConfigError, "out_elapsed_time: aggregate allows `tag` or `all`" end - @tag_slice_proc = - if @remove_tag_slice - lindex, rindex = @remove_tag_slice.split('..', 2) - if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ - raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]" - end - l, r = lindex.to_i, rindex.to_i - Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') } - else - Proc.new {|tag| tag } - end + @tag_proc = tag_proc - @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix - @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix - @tag_proc = - if @tag_prefix and @tag_prefix_match - Proc.new {|tag| "#{@tag_prefix}#{lstrip(@tag_slice_proc.call(tag), @tag_prefix_match)}" } - elsif @tag_prefix_match - Proc.new {|tag| lstrip(@tag_slice_proc.call(tag), @tag_prefix_match) } - elsif @tag_prefix - Proc.new {|tag| "#{@tag_prefix}#{@tag_slice_proc.call(tag)}" } - elsif @tag - Proc.new {|tag| @tag } - else - Proc.new {|tag| @tag_slice_proc.call(tag) } - end - @emit_proc = if @each == :message - chain = NullOutputChain.instance - Proc.new {|tag, es| - start = Time.now - es.each do |time, record| - @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) } - finish = Time.now - emit_tag = @tag_proc.call(tag) - elapsed(emit_tag) << (finish - start).to_f - start = finish - end - } + self.method(:emit_message) else - chain = NullOutputChain.instance - Proc.new {|tag, es| - t = Time.now - @outputs.each {|output| output.emit(tag, es, chain) } - emit_tag = @tag_proc.call(tag) - elapsed(emit_tag) << (Time.now - t).to_f - } + self.method(:emit_es) end end + def emit(tag, es, chain) + @emit_proc.call(tag, es) + chain.next + end + + def emit_message(tag, es) + chain = NullOutputChain.instance + start = Time.now + es.each do |time, record| + @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) } + finish = Time.now + emit_tag = @tag_proc.call(tag) + elapsed(emit_tag) << (finish - start).to_f + start = finish + end + end + + def emit_es(tag, es) + chain = NullOutputChain.instance + t = Time.now + @outputs.each {|output| output.emit(tag, es, chain) } + emit_tag = @tag_proc.call(tag) + elapsed(emit_tag) << (Time.now - t).to_f + end + def initial_elapsed(prev_elapsed = nil) return {} if !@zero_emit or prev_elapsed.nil? elapsed = {} prev_elapsed.keys.each do |tag| next if prev_elapsed[tag].empty? # Prohibit to emit anymore @@ -160,15 +144,41 @@ messages[tag] = {"max" => max, "avg" => avg, "num" => num} end messages.each {|tag, message| Engine.emit(tag, Engine.now, message) } end - def emit(tag, es, chain) - @emit_proc.call(tag, es) - chain.next - end + private - def lstrip(string, substring) - string.index(substring) == 0 ? string[substring.size..-1] : string + def tag_proc + tag_slice_proc = + if @remove_tag_slice + lindex, rindex = @remove_tag_slice.split('..', 2) + if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/ + raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]" + end + l, r = lindex.to_i, rindex.to_i + Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') } + else + Proc.new {|tag| tag } + end + + rstrip = Proc.new {|str, substr| str.chomp(substr) } + lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str } + tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix + tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix + tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix + tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix + tag_fixed = @tag if @tag + if tag_prefix_match and tag_suffix_match + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" } + elsif tag_prefix_match + Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" } + elsif tag_suffix_match + Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" } + elsif tag_prefix || @remove_tag_slice || tag_suffix + Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" } + else + Proc.new {|tag| tag_fixed } + end end end end