class Fluent::FlowCounterSimpleOutput < Fluent::Output Fluent::Plugin.register_output('flowcounter_simple', self) # To support log_level option implemented by Fluentd v0.10.43 unless method_defined?(:log) define_method("log") { $log } end config_param :indicator, :string, :default => 'num' config_param :unit, :string, :default => 'second' config_param :comment, :string, :default => nil attr_accessor :last_checked def configure(conf) super @indicator_proc = case @indicator when 'num' then Proc.new {|record| 1 } when 'byte' then Proc.new {|record| record.to_msgpack.size } else raise Fluent::ConfigError, "flowcounter-simple count allows num/byte" end @unit = case @unit when 'second' then :second when 'minute' then :minute when 'hour' then :hour when 'day' then :day else raise Fluent::ConfigError, "flowcounter-simple unit allows second/minute/hour/day" end @tick = case @unit when :second then 1 when :minute then 60 when :hour then 3600 when :day then 86400 else raise RuntimeError, "@unit must be one of second/minute/hour/day" end @output_proc = if @comment Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}\tcomment:#{@comment}" } else Proc.new {|count| "plugin:out_flowcounter_simple\tcount:#{count}\tindicator:#{@indicator}\tunit:#{@unit}" } end @count = 0 @mutex = Mutex.new end def start super start_watch end def shutdown super @watcher.terminate @watcher.join end def countup(count) @mutex.synchronize { @count = (@count || 0) + count } end def flush_emit(step) count, @count = @count, 0 if count > 0 log.info @output_proc.call(count) end end def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end def watch # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.1 if Fluent::Engine.now - @last_checked >= @tick now = Fluent::Engine.now flush_emit(now - @last_checked) @last_checked = now end end end def emit(tag, es, chain) count = 0 es.each {|time,record| count += @indicator_proc.call(record) } countup(count) chain.next end end