lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.5 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.6
- old
+ new
@@ -61,61 +61,45 @@
raise ConfigError, "out_elapsed_time: `add_tag_prefix` or `remove_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? and @remove_tag_prefix.nil?
else
raise ConfigError, "out_elapsed_time: aggregate allows `tag` or `all`"
end
- @tag_slice_proc =
- if @remove_tag_slice
- lindex, rindex = @remove_tag_slice.split('..', 2)
- if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
- raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]"
- end
- l, r = lindex.to_i, rindex.to_i
- Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') }
- else
- Proc.new {|tag| tag }
- end
+ @tag_proc = tag_proc
- @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix
- @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
- @tag_proc =
- if @tag_prefix and @tag_prefix_match
- Proc.new {|tag| "#{@tag_prefix}#{lstrip(@tag_slice_proc.call(tag), @tag_prefix_match)}" }
- elsif @tag_prefix_match
- Proc.new {|tag| lstrip(@tag_slice_proc.call(tag), @tag_prefix_match) }
- elsif @tag_prefix
- Proc.new {|tag| "#{@tag_prefix}#{@tag_slice_proc.call(tag)}" }
- elsif @tag
- Proc.new {|tag| @tag }
- else
- Proc.new {|tag| @tag_slice_proc.call(tag) }
- end
-
@emit_proc =
if @each == :message
- chain = NullOutputChain.instance
- Proc.new {|tag, es|
- start = Time.now
- es.each do |time, record|
- @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) }
- finish = Time.now
- emit_tag = @tag_proc.call(tag)
- elapsed(emit_tag) << (finish - start).to_f
- start = finish
- end
- }
+ self.method(:emit_message)
else
- chain = NullOutputChain.instance
- Proc.new {|tag, es|
- t = Time.now
- @outputs.each {|output| output.emit(tag, es, chain) }
- emit_tag = @tag_proc.call(tag)
- elapsed(emit_tag) << (Time.now - t).to_f
- }
+ self.method(:emit_es)
end
end
+ def emit(tag, es, chain)
+ @emit_proc.call(tag, es)
+ chain.next
+ end
+
+ def emit_message(tag, es)
+ chain = NullOutputChain.instance
+ start = Time.now
+ es.each do |time, record|
+ @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) }
+ finish = Time.now
+ emit_tag = @tag_proc.call(tag)
+ elapsed(emit_tag) << (finish - start).to_f
+ start = finish
+ end
+ end
+
+ def emit_es(tag, es)
+ chain = NullOutputChain.instance
+ t = Time.now
+ @outputs.each {|output| output.emit(tag, es, chain) }
+ emit_tag = @tag_proc.call(tag)
+ elapsed(emit_tag) << (Time.now - t).to_f
+ end
+
def initial_elapsed(prev_elapsed = nil)
return {} if !@zero_emit or prev_elapsed.nil?
elapsed = {}
prev_elapsed.keys.each do |tag|
next if prev_elapsed[tag].empty? # Prohibit to emit anymore
@@ -160,15 +144,41 @@
messages[tag] = {"max" => max, "avg" => avg, "num" => num}
end
messages.each {|tag, message| Engine.emit(tag, Engine.now, message) }
end
- def emit(tag, es, chain)
- @emit_proc.call(tag, es)
- chain.next
- end
+ private
- def lstrip(string, substring)
- string.index(substring) == 0 ? string[substring.size..-1] : string
+ def tag_proc
+ tag_slice_proc =
+ if @remove_tag_slice
+ lindex, rindex = @remove_tag_slice.split('..', 2)
+ if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
+ raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]"
+ end
+ l, r = lindex.to_i, rindex.to_i
+ Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') }
+ else
+ Proc.new {|tag| tag }
+ end
+
+ rstrip = Proc.new {|str, substr| str.chomp(substr) }
+ lstrip = Proc.new {|str, substr| str.start_with?(substr) ? str[substr.size..-1] : str }
+ tag_prefix = "#{rstrip.call(@add_tag_prefix, '.')}." if @add_tag_prefix
+ tag_suffix = ".#{lstrip.call(@add_tag_suffix, '.')}" if @add_tag_suffix
+ tag_prefix_match = "#{rstrip.call(@remove_tag_prefix, '.')}." if @remove_tag_prefix
+ tag_suffix_match = ".#{lstrip.call(@remove_tag_suffix, '.')}" if @remove_tag_suffix
+ tag_fixed = @tag if @tag
+ if tag_prefix_match and tag_suffix_match
+ Proc.new {|tag| "#{tag_prefix}#{rstrip.call(lstrip.call(tag_slice_proc.call(tag), tag_prefix_match), tag_suffix_match)}#{tag_suffix}" }
+ elsif tag_prefix_match
+ Proc.new {|tag| "#{tag_prefix}#{lstrip.call(tag_slice_proc.call(tag), tag_prefix_match)}#{tag_suffix}" }
+ elsif tag_suffix_match
+ Proc.new {|tag| "#{tag_prefix}#{rstrip.call(tag_slice_proc.call(tag), tag_suffix_match)}#{tag_suffix}" }
+ elsif tag_prefix || @remove_tag_slice || tag_suffix
+ Proc.new {|tag| "#{tag_prefix}#{tag_slice_proc.call(tag)}#{tag_suffix}" }
+ else
+ Proc.new {|tag| tag_fixed }
+ end
end
end
end