lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.2 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.3
- old
+ new
@@ -1,10 +1,14 @@
module Fluent
class ElapsedTimeOutput < MultiOutput
Plugin.register_output('elapsed_time', self)
config_param :tag, :string, :default => 'elapsed'
+ config_param :add_tag_prefix, :string, :default => nil
+ config_param :remove_tag_prefix, :string, :default => nil
+ config_param :remove_tag_slice, :string, :default => nil
+ config_param :aggregate, :string, :default => 'all'
config_param :interval, :time, :default => 60
config_param :each, :default => :es do |val|
case val.downcase
when 'es'
:es
@@ -16,15 +20,20 @@
end
def initialize
super
@outputs = []
- @elapsed = []
+ @elapsed = {}
end
- attr_reader :outputs, :elapsed
+ # for test
+ attr_reader :outputs
+ def elapsed(tag = :all)
+ @elapsed[tag] ||= []
+ end
+
def configure(conf)
super
conf.elements.select {|e|
e.name == 'store'
}.each {|e|
@@ -37,28 +46,74 @@
output = Plugin.new_output(type)
output.configure(e)
@outputs << output
}
+ case @aggregate
+ when 'all'
+ raise ConfigError, "out_elapsed_time: `tag` must be specified with aggregate all" if @tag.nil?
+ when 'tag'
+ raise ConfigError, "out_elapsed_time: `add_tag_prefix` or `remove_tag_prefix` must be specified with aggregate tag" if @add_tag_prefix.nil? and @remove_tag_prefix.nil?
+ else
+ raise ConfigError, "out_elapsed_time: aggregate allows `tag` or `all`"
+ end
+
+ @tag_slice_proc =
+ if @remove_tag_slice
+ lindex, rindex = @remove_tag_slice.split('..', 2)
+ if lindex.nil? or rindex.nil? or lindex !~ /^-?\d+$/ or rindex !~ /^-?\d+$/
+ raise Fluent::ConfigError, "out_elapsed_time: remove_tag_slice must be formatted like [num]..[num]"
+ end
+ l, r = lindex.to_i, rindex.to_i
+ Proc.new {|tag| (tags = tag.split('.')[l..r]).nil? ? "" : tags.join('.') }
+ else
+ Proc.new {|tag| tag }
+ end
+
+ @tag_prefix = "#{@add_tag_prefix}." if @add_tag_prefix
+ @tag_prefix_match = "#{@remove_tag_prefix}." if @remove_tag_prefix
+ @tag_proc =
+ if @tag_prefix and @tag_prefix_match
+ Proc.new {|tag| "#{@tag_prefix}#{lstrip(@tag_slice_proc.call(tag), @tag_prefix_match)}" }
+ elsif @tag_prefix_match
+ Proc.new {|tag| lstrip(@tag_slice_proc.call(tag), @tag_prefix_match) }
+ elsif @tag_prefix
+ Proc.new {|tag| "#{@tag_prefix}#{@tag_slice_proc.call(tag)}" }
+ elsif @tag
+ Proc.new {|tag| @tag }
+ else
+ Proc.new {|tag| @tag_slice_proc.call(tag) }
+ end
+
+ @push_elapsed_proc =
+ case @aggregate
+ when 'all'
+ Proc.new {|tag, elapsed_time| elapsed(:all) << elapsed_time }
+ when 'tag'
+ Proc.new {|tag, elapsed_time| elapsed(tag) << elapsed_time }
+ end
+
@emit_proc =
if @each == :message
chain = NullOutputChain.instance
Proc.new {|tag, es|
start = Time.now
es.each do |time, record|
@outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) }
finish = Time.now
- @elapsed << (finish - start).to_f
+ elapsed = (finish - start).to_f
+ @push_elapsed_proc.call(@tag_proc.call(tag), elapsed)
start = finish
end
}
else
chain = NullOutputChain.instance
Proc.new {|tag, es|
t = Time.now
@outputs.each {|output| output.emit(tag, es, chain) }
- @elapsed << (Time.now - t).to_f
+ elapsed = (Time.now - t).to_f
+ @push_elapsed_proc.call(@tag_proc.call(tag), elapsed)
}
end
end
def start
@@ -86,20 +141,26 @@
end
end
end
def flush_emit
- elapsed, @elapsed = @elapsed, []
- unless elapsed.empty?
- max = elapsed.max
- num = elapsed.size
- avg = elapsed.map(&:to_f).inject(:+) / num.to_f
- Engine.emit(@tag, Engine.now, {"max" => max, "avg" => avg, "num" => num})
+ elapseds, @elapsed = @elapsed, {}
+ elapseds.each do |tag, elapsed|
+ unless elapsed.empty?
+ max = elapsed.max
+ num = elapsed.size
+ avg = elapsed.map(&:to_f).inject(:+) / num.to_f
+ Engine.emit(tag, Engine.now, {"max" => max, "avg" => avg, "num" => num})
+ end
end
end
def emit(tag, es, chain)
@emit_proc.call(tag, es)
chain.next
+ end
+
+ def lstrip(string, substring)
+ string.index(substring) == 0 ? string[substring.size..-1] : string
end
end
end