lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.3.1 vs lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.4
- old
+ new
@@ -7,10 +7,11 @@
config_param :tag, :string, :default => nil
config_param :key, :string, :default => nil #deprected
config_param :time, :string, :default => nil
config_param :record, :string, :default => nil
config_param :multi, :bool, :default => false
+ config_param :timeout, :time, :default => 1
def configure(conf)
super
if @map
$log.debug { "map: #{@map}" }
@@ -25,39 +26,74 @@
end
end
def emit(tag, es, chain)
begin
- tuples = []
- es.each {|time, record|
+ tag_output_es = do_map(tag, es)
+ tag_output_es.each_pair do |tag, output_es|
+ Fluent::Engine::emit_stream(tag, output_es)
+ end
+ chain.next
+ tag_output_es
+ rescue SyntaxError => e
+ chain.next
+ $log.error "map command is syntax error: #{@map}"
+ e #for test
+ end
+ end
+
+ def do_map(tag, es)
+ tuples = if @multi
+ generate_tuples_multi(tag, es)
+ else
+ generate_tuples_single(tag, es)
+ end
+ tag_output_es = Hash.new{|h, key| h[key] = MultiEventStream::new}
+ tuples.each do |tag, time, record|
+ if time == nil || record == nil
+ raise SyntaxError.new
+ end
+ tag_output_es[tag].add(time, record)
+ $log.trace { [tag, time, record].inspect }
+ end
+ tag_output_es
+ end
+
+ def generate_tuples_multi(tag, es)
+ tuples = []
+ es.each {|time, record|
+ new_tuple = eval(@map)
+ tuples.concat new_tuple
+ }
+ tuples
+ end
+
+ def generate_tuples_single(tag, es)
+ tuples = []
+ es.each {|time, record|
+ timeout_block(tag, time, record){
case @mode
when "tuple"
new_tuple = eval(@map)
- if @multi
- tuples.concat new_tuple
- else
- tuples << new_tuple
- end
+ tuples << new_tuple
when "each"
new_tag = eval(@tag)
new_time = eval(@time)
new_record = eval(@record)
tuples << [new_tag, new_time, new_record]
end
}
- tuples.each do |tag, time, record|
- if time == nil or record == nil
- raise SyntaxError.new
- end
- $log.trace { [tag, time, record].inspect }
- Fluent::Engine::emit(tag, time, record)
- end
- chain.next
- tuples
- rescue SyntaxError => e
- chain.next
- $log.error "map command is syntax error: #{@map}"
- e #for test
+ }
+ tuples
+ end
+
+ def timeout_block(tag, time, record)
+ begin
+ Timeout.timeout(@timeout){
+ yield
+ }
+ rescue Timeout::Error
+ $log.error {"Timeout: #{Time.at(time)} #{tag} #{record.inspect}"}
end
end
end
end