lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.3.1 vs lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.4

- old
+ new

@@ -7,10 +7,11 @@ config_param :tag, :string, :default => nil config_param :key, :string, :default => nil #deprected config_param :time, :string, :default => nil config_param :record, :string, :default => nil config_param :multi, :bool, :default => false + config_param :timeout, :time, :default => 1 def configure(conf) super if @map $log.debug { "map: #{@map}" } @@ -25,39 +26,74 @@ end end def emit(tag, es, chain) begin - tuples = [] - es.each {|time, record| + tag_output_es = do_map(tag, es) + tag_output_es.each_pair do |tag, output_es| + Fluent::Engine::emit_stream(tag, output_es) + end + chain.next + tag_output_es + rescue SyntaxError => e + chain.next + $log.error "map command is syntax error: #{@map}" + e #for test + end + end + + def do_map(tag, es) + tuples = if @multi + generate_tuples_multi(tag, es) + else + generate_tuples_single(tag, es) + end + tag_output_es = Hash.new{|h, key| h[key] = MultiEventStream::new} + tuples.each do |tag, time, record| + if time == nil || record == nil + raise SyntaxError.new + end + tag_output_es[tag].add(time, record) + $log.trace { [tag, time, record].inspect } + end + tag_output_es + end + + def generate_tuples_multi(tag, es) + tuples = [] + es.each {|time, record| + new_tuple = eval(@map) + tuples.concat new_tuple + } + tuples + end + + def generate_tuples_single(tag, es) + tuples = [] + es.each {|time, record| + timeout_block(tag, time, record){ case @mode when "tuple" new_tuple = eval(@map) - if @multi - tuples.concat new_tuple - else - tuples << new_tuple - end + tuples << new_tuple when "each" new_tag = eval(@tag) new_time = eval(@time) new_record = eval(@record) tuples << [new_tag, new_time, new_record] end } - tuples.each do |tag, time, record| - if time == nil or record == nil - raise SyntaxError.new - end - $log.trace { [tag, time, record].inspect } - Fluent::Engine::emit(tag, time, record) - end - chain.next - tuples - rescue SyntaxError => e - chain.next - $log.error "map command is syntax error: #{@map}" - e #for test + } + tuples + end + + def timeout_block(tag, time, record) + begin + Timeout.timeout(@timeout){ + yield + } + rescue Timeout::Error + $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"} end end end end