Sha256: a8549f11b0e01e706ef2133359dd7c3e7a178e6b77f5b3074025fe239ad10962

Contents?: true

Size: 1.78 KB

Versions: 2

Compression:

Stored size: 1.78 KB

Contents

class Fluent::SamplingFilter < Fluent::Filter
  Fluent::Plugin.register_filter('sampling_filter', self)

  config_param :interval, :integer
  config_param :sample_unit, :string, default: 'tag'
  config_param :minimum_rate_per_min, :integer, default: nil

  def configure(conf)
    super

    @sample_unit = case @sample_unit
                   when 'tag'
                     :tag
                   when 'all'
                     :all
                   else
                     raise Fluent::ConfigError, "sample_unit allows only 'tag' or 'all'"
                   end
    @counts = {}
    @resets = {} if @minimum_rate_per_min
  end

  def filter_stream(tag, es)
    t = if @sample_unit == :all
          'all'
        else
          tag
        end

    new_es = Fluent::MultiEventStream.new

    # Access to @counts SHOULD be protected by mutex, with a heavy penalty.
    # Code below is not thread safe, but @counts (counter for sampling rate) is not
    # so serious value (and probably will not be broken...),
    # then i let here as it is now.
    if @minimum_rate_per_min
      unless @resets[t]
        @resets[t] = Fluent::Engine.now + (60 - rand(30))
      end
      if Fluent::Engine.now > @resets[t]
        @resets[t] = Fluent::Engine.now + 60
        @counts[t] = 0
      end
      es.each do |time,record|
        c = (@counts[t] = @counts.fetch(t, 0) + 1)
        if c < @minimum_rate_per_min or c % @interval == 0
          new_es.add(time, record.dup)
        end
      end
    else
      es.each do |time,record|
        c = (@counts[t] = @counts.fetch(t, 0) + 1)
        if c % @interval == 0
          new_es.add(time, record.dup)
          # reset only just before @counts[t] is to be bignum from fixnum
          @counts[t] = 0 if c > 0x6fffffff
        end
      end
    end
    new_es
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluent-plugin-sampling-filter-0.2.1 lib/fluent/plugin/filter_sampling.rb
fluent-plugin-sampling-filter-0.2.0 lib/fluent/plugin/filter_sampling.rb