Sha256: e79e7a4520ffb877a2e2345191feb09defaae016b8007cf629cf18cf7ea7c071

Contents?: true

Size: 1.93 KB

Versions: 1

Compression:

Stored size: 1.93 KB

Contents

class Fluent::SamplingFilterOutput < Fluent::Output
  Fluent::Plugin.register_output('sampling_filter', self)

  config_param :interval, :integer
  config_param :sample_unit, :string, :default => 'tag'
  config_param :remove_prefix, :string, :default => nil
  config_param :add_prefix, :string, :default => 'sampled'

  def configure(conf)
    super

    if @remove_prefix
      @removed_prefix_string = @remove_prefix + '.'
      @removed_length = @removed_prefix_string.length
    end
    @added_prefix_string = @add_prefix + '.'

    @sample_unit = case @sample_unit
                   when 'tag'
                     :tag
                   when 'all'
                     :all
                   else
                     raise Fluent::ConfigError, "sample_unit allows only 'tag' or 'all'"
                   end
    @counts = {}
  end

  def emit_sampled(tag, time_record_pairs)
    if @remove_prefix and
        ( (tag.start_with?(@removed_prefix_string) and tag.length > @removed_length) or tag == @remove_prefix)
      tag = tag[@removed_length..-1]
    end
    tag = if tag.length > 0
            @added_prefix_string + tag
          else
            @add_prefix
          end

    time_record_pairs.each {|t,r|
      Fluent::Engine.emit(tag, t, r)
    }
  end

  def emit(tag, es, chain)
    t = if @sample_unit == :all
          'all'
        else
          tag
        end
    # Access to @counts SHOULD be protected by mutex, with a heavy penalty.
    # Code below is not thread safe, but @counts (counter for sampling rate) is not
    # so serious value (and probably will not be broke...),
    # then i let here as it is now.
    pairs = []
    es.each {|time,record|
      c = (@counts[t] = @counts.fetch(t, 0) + 1)
      if c % @interval == 0 
        pairs.push [time, record]
        # reset only just before @counts[t] is to be bignum from fixnum
        @counts[t] = 0 if c > 0x6fffffff
      end
    }
    emit_sampled(tag, pairs)

    chain.next
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-sampling-filter-0.1.1 lib/fluent/plugin/out_sampling_filter.rb