require 'fluent/input' module Fluent class MeasureTimeInput < Input Plugin.register_input('measure_time', self) def configure(conf) ::Fluent::Input.__send__(:include, MeasureTimable) ::Fluent::Output.__send__(:include, MeasureTimable) end end module MeasureTimable def self.included(klass) unless klass.method_defined?(:configure_without_measure_time) klass.__send__(:alias_method, :configure_without_measure_time, :configure) klass.__send__(:alias_method, :configure, :configure_with_measure_time) end end attr_reader :measure_time def configure_with_measure_time(conf) configure_without_measure_time(conf) if element = conf.elements.select { |element| element.name == 'measure_time' }.first @measure_time = MeasureTime.new(self, log) @measure_time.configure(element) end end end class MeasureTime attr_reader :plugin, :log, :times, :mutex, :thread, :tag, :interval, :hook def initialize(plugin, log) @plugin = plugin @klass = @plugin.class @log = log @times = [] @mutex = Mutex.new end def configure(conf) @tag = conf['tag'] || 'measure_time' unless @hook = conf['hook'] raise Fluent::ConfigError, '`hook` option must be specified in directive' end @hook_msg = {:class => @klass.to_s, :hook => @hook.to_s, :object_id => @plugin.object_id.to_s} @interval = conf['interval'].to_i if conf['interval'] @add_or_emit_proc = if @interval # add to calculate statistics in each interval Proc.new {|elapsed| @mutex.synchronize { @times << elapsed } } else # emit information immediately Proc.new {|elapsed| msg = {:time => elapsed}.merge(@hook_msg) ::Fluent::Engine.emit(@tag, ::Fluent::Engine.now, msg) } end apply_hook end def apply_hook @plugin.instance_eval <= @interval flush(now) @last_checked = now end rescue => e log.warn "in_measure_time: hook #{@klass}##{@hook} #{e.class} #{e.message} #{e.backtrace.first}" end end end def flush(now) times = [] @mutex.synchronize do times = @times.dup @times.clear end triple = nil unless times.empty? num = times.size max = num == 0 ? 0 : times.max avg = num == 0 ? 0 : times.map(&:to_f).inject(:+) / num.to_f triple = [@tag, now, {:max => max, :avg => avg, :num => num}.merge(@hook_msg)] Engine.emit(*triple) end triple end end end