Sha256: dc53838035821a9986863859edcfa021555b07424c7e39abf8e602f82ea37dfc
Contents?: true
Size: 1.54 KB
Versions: 1
Compression:
Stored size: 1.54 KB
Contents
module Fluent class SelectOutput < Fluent::Output Fluent::Plugin.register_output('select', self) config_param :select, :string config_param :add_prefix, :string, :default => nil config_param :tag, :string, :default => nil config_param :timeout, :time, :default => 1 def configure(conf) super if @add_prefix @mode = "add_prefix" elsif @tag @mode = "tag" else raise ConfigError, "Either add_prefix or tag is required " end end def emit(tag, es, chain) begin output_es = do_select(tag, es) if @mode == "add_prefix" Fluent::Engine::emit_stream(@add_prefix + "." + tag, output_es) else Fluent::Engine::emit_stream(@tag, output_es) end chain.next output_es #for test rescue SyntaxError => e chain.next $log.error "Select command is syntax error: #{@select}" e #for test end end def do_select(tag, es) output_es = MultiEventStream.new es.each {|time, record| timeout_block(tag, time, record){ if eval(@select) output_es.add(time, record) else $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"} end } } output_es end def timeout_block(tag, time, record) begin Timeout.timeout(@timeout){ yield } rescue Timeout::Error $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-select-0.0.3.1 | lib/fluent/plugin/out_select.rb |