Sha256: 4d2cfe8305f1da3ba7655cd06c3feda96bc82e2151418cc163f72f486f23072a
Contents?: true
Size: 1.73 KB
Versions: 1
Compression:
Stored size: 1.73 KB
Contents
require 'fluent/plugin/unit_time_filter_buffer' class Fluent::UnitTimeFilterOutput < Fluent::Output Fluent::Plugin.register_output('unit_time_filter', self) config_param :filter_path, :type => :string config_param :unit_sec, :type => :size, :default => 1 config_param :prefix, :type => :string, :default => 'filtered' config_param :emit_each_tag, :type => :bool, :default => false config_param :pass_hash_row, :type => :bool, :default => false config_param :hash_row_time_key, :type => :string, :default => 'time' config_param :hash_row_tag_key, :type => :string, :default => 'tag' BUFFER_KEY = :unit_time_filter_buffer def configure(conf) super unless File.exist?(@filter_path) raise Fluent::ConfigError, "No such file: #{@filter_path}" end begin @filter = Object.new.instance_eval(File.read(@filter_path)) rescue => e raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: #{e}" end unless @filter.kind_of?(Proc) raise Fluent::ConfigError, "Invalid filter: #{@filter_path}: Filter must be Proc" end end def shutdown super Thread.current[BUFFER_KEY] = nil end def buffer buf = Thread.current[BUFFER_KEY] unless buf buf = Buffer.new({ :filter => @filter, :unit_sec => @unit_sec, :prefix => @prefix, :emit_each_tag => @emit_each_tag, :pass_hash_row => @pass_hash_row, :hash_row_time_key => @hash_row_time_key, :hash_row_tag_key => @hash_row_tag_key, }) Thread.current[BUFFER_KEY] = buf end return buf end def emit(tag, es, chain) buffer.resume(tag, es) chain.next end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-unit-time-filter-0.1.0 | lib/fluent/plugin/out_unit_time_filter.rb |