lib/fluent/plugin/out_select.rb in fluent-plugin-select-0.0.2 vs lib/fluent/plugin/out_select.rb in fluent-plugin-select-0.0.3

- old
+ new

@@ -2,32 +2,62 @@ module Fluent class SelectOutput < Fluent::Output Fluent::Plugin.register_output('select', self) config_param :select, :string - config_param :add_prefix, :string - config_param :timeout, :integer, :default => 1 + config_param :add_prefix, :string, :default => nil + config_param :tag, :string, :default => nil + config_param :timeout, :time, :default => 1 + def configure(conf) + super + if @add_prefix + @mode = "add_prefix" + elsif @tag + @mode = "tag" + else + raise ConfigError, "Either add_prefix or tag is required " + end + end + def emit(tag, es, chain) begin - time_records = [] - es.each {|time, record| - if eval(@select) - $log.trace {"hoge"} - time_records << [time, record] - else - $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"} - end - } - time_records.each do |time, record| - Fluent::Engine::emit(@add_prefix + "." + tag, time, record) + output_es = do_select(tag, es) + if @mode == "add_prefix" + Fluent::Engine::emit_stream(@add_prefix + "." + tag, output_es) + else + Fluent::Engine::emit_stream(@tag, output_es) end chain.next - time_records #for test + output_es #for test rescue SyntaxError => e chain.next $log.error "Select command is syntax error: #{@select}" e #for test + end + end + + def do_select(tag, es) + output_es = MultiEventStream.new + es.each {|time, record| + timeout_block{ + if eval(@select) + output_es.add(time, record) + else + $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"} + end + } + } + output_es + end + + def timeout_block + begin + Timeout.timeout(@timeout){ + yield + } + rescue Timeout::Error + $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} end end end end