lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.3 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.4

- old
+ new

@@ -16,21 +16,22 @@ :message else raise ConfigError, "out_elapsed_time: each should be 'es' or 'message'" end end + config_param :zero_emit, :bool, :default => false def initialize super @outputs = [] @elapsed = {} end # for test attr_reader :outputs - def elapsed(tag = :all) + def elapsed(tag = "elapsed") # default: @tag @elapsed[tag] ||= [] end def configure(conf) super @@ -82,42 +83,44 @@ Proc.new {|tag| @tag } else Proc.new {|tag| @tag_slice_proc.call(tag) } end - @push_elapsed_proc = - case @aggregate - when 'all' - Proc.new {|tag, elapsed_time| elapsed(:all) << elapsed_time } - when 'tag' - Proc.new {|tag, elapsed_time| elapsed(tag) << elapsed_time } - end - @emit_proc = if @each == :message chain = NullOutputChain.instance Proc.new {|tag, es| start = Time.now es.each do |time, record| @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) } finish = Time.now - elapsed = (finish - start).to_f - @push_elapsed_proc.call(@tag_proc.call(tag), elapsed) + emit_tag = @tag_proc.call(tag) + elapsed(emit_tag) << (finish - start).to_f start = finish end } else chain = NullOutputChain.instance Proc.new {|tag, es| t = Time.now @outputs.each {|output| output.emit(tag, es, chain) } - elapsed = (Time.now - t).to_f - @push_elapsed_proc.call(@tag_proc.call(tag), elapsed) + emit_tag = @tag_proc.call(tag) + elapsed(emit_tag) << (Time.now - t).to_f } end end + def initial_elapsed(prev_elapsed = nil) + return {} if !@zero_emit or prev_elapsed.nil? + elapsed = {} + prev_elapsed.keys.each do |tag| + next if prev_elapsed[tag].empty? # Prohibit to emit anymore + elapsed[tag] = [] + end + elapsed + end + def start @outputs.each {|o| o.start } @thread = Thread.new(&method(:run)) @@ -141,18 +144,18 @@ end end end def flush_emit - elapseds, @elapsed = @elapsed, {} - elapseds.each do |tag, elapsed| - unless elapsed.empty? - max = elapsed.max - num = elapsed.size - avg = elapsed.map(&:to_f).inject(:+) / num.to_f - Engine.emit(tag, Engine.now, {"max" => max, "avg" => avg, "num" => num}) - end + flushed_elapsed, @elapsed = @elapsed, initial_elapsed(@elapsed) + messages = {} + flushed_elapsed.each do |tag, elapsed| + num = elapsed.size + max = num == 0 ? 0 : elapsed.max + avg = num == 0 ? 0 : elapsed.map(&:to_f).inject(:+) / num.to_f + messages[tag] = {"max" => max, "avg" => avg, "num" => num} end + messages.each {|tag, message| Engine.emit(tag, Engine.now, message) } end def emit(tag, es, chain) @emit_proc.call(tag, es) chain.next