Sha256: 5d1df9e1239b0ab2039d2c75750a91eb4556c54b125beb211c2cd3092c99d218
Contents?: true
Size: 1.65 KB
Versions: 1
Compression:
Stored size: 1.65 KB
Contents
# coding: utf-8 # class Fluent::UniqueCounterOutput < Fluent::Output Fluent::Plugin.register_output('unique_counter', self) config_param :count_interval, :time, :default => 60 config_param :unit, :string, :default => nil config_param :unique_key, :string config_param :tag, :string, :default => 'unique_count' attr_accessor :tick attr_accessor :counts attr_accessor :last_checked UNITS = { minutes: 60, hours: 3600, days: 86400 } def configure(conf) super if @unit and UNITS[@unit.to_sym].nil? raise Fluent::ConfigError, "'unit' must be one of minutes/hours/days" end @count_interval = UNITS[@unit.to_sym] if @unit @counts = [] @mutex = Mutex.new end def start super start_watch end def shutdown super @watcher.terminate @watcher.join end def flush_emit flushed,@counts = @counts,[] message = {'unique_count' => flushed.uniq.count } Fluent::Engine.emit(@tag, Fluent::Engine.now, message) end def start_watch # for internal, or tests only @watcher = Thread.new(&method(:watch)) end def watch # instance variable, and public accessable, for test @last_checked = Fluent::Engine.now while true sleep 0.5 if (Fluent::Engine.now - @last_checked) >= @count_interval now = Fluent::Engine.now flush_emit @last_checked = now end end end def emit(tag, es, chain) c = [] es.each do |time,record| value = record[@unique_key] next if value.nil? c << value.to_s.force_encoding('ASCII-8BIT') end @mutex.synchronize { @counts += c } chain.next end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-unique-counter-0.1.3 | lib/fluent/plugin/out_unique_counter.rb |