lib/fluent/plugin/out_select.rb in fluent-plugin-select-0.0.2 vs lib/fluent/plugin/out_select.rb in fluent-plugin-select-0.0.3
- old
+ new
@@ -2,32 +2,62 @@
module Fluent
class SelectOutput < Fluent::Output
Fluent::Plugin.register_output('select', self)
config_param :select, :string
- config_param :add_prefix, :string
- config_param :timeout, :integer, :default => 1
+ config_param :add_prefix, :string, :default => nil
+ config_param :tag, :string, :default => nil
+ config_param :timeout, :time, :default => 1
+ def configure(conf)
+ super
+ if @add_prefix
+ @mode = "add_prefix"
+ elsif @tag
+ @mode = "tag"
+ else
+ raise ConfigError, "Either add_prefix or tag is required "
+ end
+ end
+
def emit(tag, es, chain)
begin
- time_records = []
- es.each {|time, record|
- if eval(@select)
- $log.trace {"hoge"}
- time_records << [time, record]
- else
- $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"}
- end
- }
- time_records.each do |time, record|
- Fluent::Engine::emit(@add_prefix + "." + tag, time, record)
+ output_es = do_select(tag, es)
+ if @mode == "add_prefix"
+ Fluent::Engine::emit_stream(@add_prefix + "." + tag, output_es)
+ else
+ Fluent::Engine::emit_stream(@tag, output_es)
end
chain.next
- time_records #for test
+ output_es #for test
rescue SyntaxError => e
chain.next
$log.error "Select command is syntax error: #{@select}"
e #for test
+ end
+ end
+
+ def do_select(tag, es)
+ output_es = MultiEventStream.new
+ es.each {|time, record|
+ timeout_block{
+ if eval(@select)
+ output_es.add(time, record)
+ else
+ $log.trace {"filtered: #{Time.at(time)} #{tag} #{record.inspect}"}
+ end
+ }
+ }
+ output_es
+ end
+
+ def timeout_block
+ begin
+ Timeout.timeout(@timeout){
+ yield
+ }
+ rescue Timeout::Error
+ $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
end
end
end
end