lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.7 vs lib/fluent/plugin/out_elapsed_time.rb in fluent-plugin-elapsed-time-0.0.8
- old
+ new
@@ -5,10 +5,15 @@
# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
define_method("log") { $log }
end
+ # Define `router` method of v0.12 to support v0.10 or earlier
+ unless method_defined?(:router)
+ define_method("router") { Fluent::Engine }
+ end
+
config_param :tag, :string, :default => 'elapsed'
config_param :add_tag_prefix, :string, :default => nil
config_param :remove_tag_prefix, :string, :default => nil
config_param :remove_tag_slice, :string, :default => nil
config_param :aggregate, :string, :default => 'all'
@@ -27,10 +32,11 @@
def initialize
super
@outputs = []
@elapsed = {}
+ @emit_procs = []
end
# for test
attr_reader :outputs
@@ -49,10 +55,16 @@
end
log.debug "adding store type=#{type.dump}"
output = Plugin.new_output(type)
output.configure(e)
+ emit_proc = if output.respond_to?(:emit_events)
+ Proc.new {|output, tag, es, _chain| output.emit_events(tag, es)}
+ else
+ Proc.new {|output, tag, es, _chain| output.emit(tag, es, NullOutputChain.instance)}
+ end
+ @emit_procs << emit_proc
@outputs << output
}
case @aggregate
when 'all'
@@ -80,22 +92,26 @@
def emit_message(tag, es)
chain = NullOutputChain.instance
start = Time.now
es.each do |time, record|
- @outputs.each {|output| output.emit(tag, OneEventStream.new(time, record), chain) }
+ @outputs.each_with_index {|output, idx|
+ @emit_procs[idx].call(output, tag, OneEventStream.new(time, record), chain)
+ }
finish = Time.now
emit_tag = @tag_proc.call(tag)
elapsed(emit_tag) << (finish - start).to_f
start = finish
end
end
def emit_es(tag, es)
chain = NullOutputChain.instance
t = Time.now
- @outputs.each {|output| output.emit(tag, es, chain) }
+ @outputs.each_with_index {|output, idx|
+ @emit_procs[idx].call(output, tag, es,chain)
+ }
emit_tag = @tag_proc.call(tag)
elapsed(emit_tag) << (Time.now - t).to_f
end
def initial_elapsed(prev_elapsed = nil)
@@ -141,10 +157,10 @@
num = elapsed.size
max = num == 0 ? 0 : elapsed.max
avg = num == 0 ? 0 : elapsed.map(&:to_f).inject(:+) / num.to_f
messages[tag] = {"max" => max, "avg" => avg, "num" => num}
end
- messages.each {|tag, message| Engine.emit(tag, Engine.now, message) }
+ messages.each {|tag, message| router.emit(tag, Engine.now, message) }
end
private
def tag_proc