lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.7 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.8

- old
+ new

@@ -5,10 +5,15 @@ # To support log_level option implemented by Fluentd v0.10.43 unless method_defined?(:log) define_method("log") { $log } end + # Define `router` method of v0.12 to support v0.10 or earlier + unless method_defined?(:router) + define_method("router") { Fluent::Engine } + end + config_param :tag, :string, :default => 'elapsed' config_param :add_tag_prefix, :string, :default => nil config_param :remove_tag_prefix, :string, :default => nil config_param :remove_tag_slice, :string, :default => nil config_param :aggregate, :string, :default => 'all' @@ -27,10 +32,11 @@ def initialize super @outputs = [] @elapsed = {} + @emit_procs = [] end # for test attr_reader :outputs @@ -49,10 +55,16 @@ end log.debug "adding store type=#{type.dump}" output = Plugin.new_output(type) output.configure(e) + emit_proc = if output.respond_to?(:emit_events) + Proc.new {|output, tag, es, _chain| output.emit_events(tag, es)} + else + Proc.new {|output, tag, es, _chain| output.emit(tag, es, NullOutputChain.instance)} + end + @emit_procs << emit_proc @outputs << output } case @aggregate when 'all' @@ -80,22 +92,26 @@ def emit_message(tag, es) chain = NullOutputChain.instance start = Time.now es.each do |time, record| - @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) } + @outputs.each_with_index {|output, idx| + @emit_procs[idx].call(output, tag, OneEventStream.new(time, record), chain) + } finish = Time.now emit_tag = @tag_proc.call(tag) elapsed(emit_tag) << (finish - start).to_f start = finish end end def emit_es(tag, es) chain = NullOutputChain.instance t = Time.now - @outputs.each {|output| output.emit(tag, es, chain) } + @outputs.each_with_index {|output, idx| + @emit_procs[idx].call(output, tag, es,chain) + } emit_tag = @tag_proc.call(tag) elapsed(emit_tag) << (Time.now - t).to_f end def initial_elapsed(prev_elapsed = nil) @@ -141,10 +157,10 @@ num = elapsed.size max = num == 0 ? 0 : elapsed.max avg = num == 0 ? 0 : elapsed.map(&:to_f).inject(:+) / num.to_f messages[tag] = {"max" => max, "avg" => avg, "num" => num} end - messages.each {|tag, message| Engine.emit(tag, Engine.now, message) } + messages.each {|tag, message| router.emit(tag, Engine.now, message) } end private def tag_proc