Sha256: 01342cb953de7a5b8ffab70efd96cb39b1b367a9a5c2ee180b2a095ec867fc43
Contents?: true
Size: 1.48 KB
Versions: 1
Compression:
Stored size: 1.48 KB
Contents
require 'fluent/plugin/filter' require 'fluent/clock' class Fluent::Plugin::SamplingFilter < Fluent::Plugin::Filter Fluent::Plugin.register_filter('sampling_filter', self) config_param :interval, :integer config_param :sample_unit, :enum, list: [:tag, :all], default: :tag config_param :minimum_rate_per_min, :integer, default: nil def configure(conf) super @counts = {} @resets = {} if @minimum_rate_per_min end # Access to @counts SHOULD be protected by mutex, with a heavy penalty. # Code below is not thread safe, but @counts (counter for sampling rate) is not # so serious value (and probably will not be broken...), # then i let here as it is now. def filter(tag, _time, record) t = @sample_unit == :all ? 'all' : tag if @minimum_rate_per_min filter_with_minimum_rate(t, record) else filter_simple(t, record) end end def filter_simple(t, record) c = (@counts[t] = @counts.fetch(t, 0) + 1) # reset only just before @counts[t] is to be bignum from fixnum @counts[t] = 0 if c > 0x6fffffff if c % @interval == 0 record else nil end end def filter_with_minimum_rate(t, record) @resets[t] ||= Fluent::Clock.now + (60 - rand(30)) if Fluent::Clock.now > @resets[t] @resets[t] = Fluent::Clock.now + 60 @counts[t] = 0 end c = (@counts[t] = @counts.fetch(t, 0) + 1) if c < @minimum_rate_per_min || c % @interval == 0 record.dup else nil end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-sampling-filter-1.0.0 | lib/fluent/plugin/filter_sampling.rb |