Sha256: 4d2cfe8305f1da3ba7655cd06c3feda96bc82e2151418cc163f72f486f23072a

Contents?: true

Size: 1.73 KB

Versions: 1

Compression:

Stored size: 1.73 KB

Contents

require 'fluent/plugin/unit_time_filter_buffer'

class Fluent::UnitTimeFilterOutput < Fluent::Output
  Fluent::Plugin.register_output('unit_time_filter', self)

  config_param :filter_path,       :type => :string
  config_param :unit_sec,          :type => :size,   :default => 1
  config_param :prefix,            :type => :string, :default => 'filtered'
  config_param :emit_each_tag,     :type => :bool,   :default => false
  config_param :pass_hash_row,     :type => :bool,   :default => false
  config_param :hash_row_time_key, :type => :string, :default => 'time'
  config_param :hash_row_tag_key,  :type => :string, :default => 'tag'

  BUFFER_KEY = :unit_time_filter_buffer

  def configure(conf)
    super

    unless File.exist?(@filter_path)
      raise Fluent::ConfigError, "No such file: #{@filter_path}"
    end

    begin
      @filter = Object.new.instance_eval(File.read(@filter_path))
    rescue => e
      raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}"
    end

    unless @filter.kind_of?(Proc)
      raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: Filter must be Proc"
    end
  end

  def shutdown
    super
    Thread.current[BUFFER_KEY] = nil
  end

  def buffer
    buf = Thread.current[BUFFER_KEY]

    unless buf
      buf = Buffer.new({
        :filter            => @filter,
        :unit_sec          => @unit_sec,
        :prefix            => @prefix,
        :emit_each_tag     => @emit_each_tag,
        :pass_hash_row     => @pass_hash_row,
        :hash_row_time_key => @hash_row_time_key,
        :hash_row_tag_key  => @hash_row_tag_key,
      })

      Thread.current[BUFFER_KEY] = buf
    end

    return buf
  end

  def emit(tag, es, chain)
    buffer.resume(tag, es)
    chain.next
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-unit-time-filter-0.1.0 lib/fluent/plugin/out_unit_time_filter.rb