Sha256: 01342cb953de7a5b8ffab70efd96cb39b1b367a9a5c2ee180b2a095ec867fc43

Contents?: true

Size: 1.48 KB

Versions: 1

Compression:

Stored size: 1.48 KB

Contents

require 'fluent/plugin/filter'
require 'fluent/clock'

class Fluent::Plugin::SamplingFilter < Fluent::Plugin::Filter
  Fluent::Plugin.register_filter('sampling_filter', self)

  config_param :interval, :integer
  config_param :sample_unit, :enum, list: [:tag, :all], default: :tag
  config_param :minimum_rate_per_min, :integer, default: nil

  def configure(conf)
    super

    @counts = {}
    @resets = {} if @minimum_rate_per_min
  end

  # Access to @counts SHOULD be protected by mutex, with a heavy penalty.
  # Code below is not thread safe, but @counts (counter for sampling rate) is not
  # so serious value (and probably will not be broken...),
  # then i let here as it is now.

  def filter(tag, _time, record)
    t = @sample_unit == :all ? 'all' : tag
    if @minimum_rate_per_min
      filter_with_minimum_rate(t, record)
    else
      filter_simple(t, record)
    end
  end

  def filter_simple(t, record)
    c = (@counts[t] = @counts.fetch(t, 0) + 1)
    # reset only just before @counts[t] is to be bignum from fixnum
    @counts[t] = 0 if c > 0x6fffffff
    if c % @interval == 0
      record
    else
      nil
    end
  end

  def filter_with_minimum_rate(t, record)
    @resets[t] ||= Fluent::Clock.now + (60 - rand(30))
    if Fluent::Clock.now > @resets[t]
      @resets[t] = Fluent::Clock.now + 60
      @counts[t] = 0
    end
    c = (@counts[t] = @counts.fetch(t, 0) + 1)
    if c < @minimum_rate_per_min || c % @interval == 0
      record.dup
    else
      nil
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sampling-filter-1.0.0 lib/fluent/plugin/filter_sampling.rb