lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.3 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.4
- old
+ new
@@ -16,21 +16,22 @@
:message
else
raise ConfigError, "out_elapsed_time: each should be 'es' or 'message'"
end
end
+ config_param :zero_emit, :bool, :default => false
def initialize
super
@outputs = []
@elapsed = {}
end
# for test
attr_reader :outputs
- def elapsed(tag = :all)
+ def elapsed(tag = "elapsed") # default: @tag
@elapsed[tag] ||= []
end
def configure(conf)
super
@@ -82,42 +83,44 @@
Proc.new {|tag| @tag }
else
Proc.new {|tag| @tag_slice_proc.call(tag) }
end
- @push_elapsed_proc =
- case @aggregate
- when 'all'
- Proc.new {|tag, elapsed_time| elapsed(:all) << elapsed_time }
- when 'tag'
- Proc.new {|tag, elapsed_time| elapsed(tag) << elapsed_time }
- end
-
@emit_proc =
if @each == :message
chain = NullOutputChain.instance
Proc.new {|tag, es|
start = Time.now
es.each do |time, record|
@outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) }
finish = Time.now
- elapsed = (finish - start).to_f
- @push_elapsed_proc.call(@tag_proc.call(tag), elapsed)
+ emit_tag = @tag_proc.call(tag)
+ elapsed(emit_tag) << (finish - start).to_f
start = finish
end
}
else
chain = NullOutputChain.instance
Proc.new {|tag, es|
t = Time.now
@outputs.each {|output| output.emit(tag, es, chain) }
- elapsed = (Time.now - t).to_f
- @push_elapsed_proc.call(@tag_proc.call(tag), elapsed)
+ emit_tag = @tag_proc.call(tag)
+ elapsed(emit_tag) << (Time.now - t).to_f
}
end
end
+ def initial_elapsed(prev_elapsed = nil)
+ return {} if !@zero_emit or prev_elapsed.nil?
+ elapsed = {}
+ prev_elapsed.keys.each do |tag|
+ next if prev_elapsed[tag].empty? # Prohibit to emit anymore
+ elapsed[tag] = []
+ end
+ elapsed
+ end
+
def start
@outputs.each {|o|
o.start
}
@thread = Thread.new(&method(:run))
@@ -141,18 +144,18 @@
end
end
end
def flush_emit
- elapseds, @elapsed = @elapsed, {}
- elapseds.each do |tag, elapsed|
- unless elapsed.empty?
- max = elapsed.max
- num = elapsed.size
- avg = elapsed.map(&:to_f).inject(:+) / num.to_f
- Engine.emit(tag, Engine.now, {"max" => max, "avg" => avg, "num" => num})
- end
+ flushed_elapsed, @elapsed = @elapsed, initial_elapsed(@elapsed)
+ messages = {}
+ flushed_elapsed.each do |tag, elapsed|
+ num = elapsed.size
+ max = num == 0 ? 0 : elapsed.max
+ avg = num == 0 ? 0 : elapsed.map(&:to_f).inject(:+) / num.to_f
+ messages[tag] = {"max" => max, "avg" => avg, "num" => num}
end
+ messages.each {|tag, message| Engine.emit(tag, Engine.now, message) }
end
def emit(tag, es, chain)
@emit_proc.call(tag, es)
chain.next