lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.4 vs lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.5
- old
+ new
@@ -8,26 +8,95 @@
config_param :key, :string, :default => nil #deprected
config_param :time, :string, :default => nil
config_param :record, :string, :default => nil
config_param :multi, :bool, :default => false
config_param :timeout, :time, :default => 1
+ config_param :format, :string, :default => nil
+ MMAP_MAX_NUM = 50
+
def configure(conf)
super
- if @map
- $log.debug { "map: #{@map}" }
- @mode = "tuple"
+ @format = determine_format()
+ configure_format()
+ @map = create_map(conf)
+ end
+
+ def determine_format()
+ if @format
+ @format
+ elsif @map
+ "map"
elsif (@tag || @key) && @time && @record
+ "record"
+ else
+ raise ConfigError, "Any of map, 3 parameters(key, time, and record) or format is required "
+ end
+ end
+
+ def configure_format()
+ case @format
+ when "map"
+ # pass
+ when "record"
@tag ||= @key
raise ConfigError, "multi and 3 parameters(tag, time, and record) are not compatible" if @multi
- $log.debug { "tag: #{@tag}, time: #{@time}, record: #{@record}" }
- @mode = "each"
+ when "multimap"
+ # pass.
else
- raise ConfigError, "Either map or 3 parameters(key, time, and record) is required "
+ raise ConfigError, "format #{@format} is invalid."
end
end
+ def create_map(conf)
+ # return string like double array.
+ case @format
+ when "map"
+ parse_map()
+ when "record"
+ "[[#{@tag}, #{@time}, #{@record}]]"
+ when "multimap"
+ parse_multimap(conf)
+ end
+ end
+
+ def parse_map()
+ if @multi
+ @map
+ else
+ "[#{@map}]"
+ end
+ end
+
+ def parse_multimap(conf)
+ check_mmap_range(conf)
+
+ prev_mmap = nil
+ result_mmaps = (1..MMAP_MAX_NUM).map { |i|
+ mmap = conf["mmap#{i}"]
+ if (i > 1) && prev_mmap.nil? && !mmap.nil?
+ raise ConfigError, "Jump of mmap index found. mmap#{i - 1} is missing."
+ end
+ prev_mmap = mmap
+ next if mmap.nil?
+
+ mmap
+ }.compact.join(',')
+ "[#{result_mmaps}]"
+ end
+
+ def check_mmap_range(conf)
+ invalid_mmap = conf.keys.select { |k|
+ m = k.match(/^mmap(\d+)$/)
+ m ? !((1..MMAP_MAX_NUM).include?(m[1].to_i)) : false
+ }
+ unless invalid_mmap.empty?
+ raise ConfigError, "Invalid mmapN found. N should be 1 - #{MMAP_MAX_NUM}: " + invalid_mmap.join(",")
+ end
+ end
+
+
def emit(tag, es, chain)
begin
tag_output_es = do_map(tag, es)
tag_output_es.each_pair do |tag, output_es|
Fluent::Engine::emit_stream(tag, output_es)
@@ -40,15 +109,12 @@
e #for test
end
end
def do_map(tag, es)
- tuples = if @multi
- generate_tuples_multi(tag, es)
- else
- generate_tuples_single(tag, es)
- end
+ tuples = generate_tuples(tag, es)
+
tag_output_es = Hash.new{|h, key| h[key] = MultiEventStream::new}
tuples.each do |tag, time, record|
if time == nil || record == nil
raise SyntaxError.new
end
@@ -56,35 +122,16 @@
$log.trace { [tag, time, record].inspect }
end
tag_output_es
end
- def generate_tuples_multi(tag, es)
+ def generate_tuples(tag, es)
tuples = []
es.each {|time, record|
new_tuple = eval(@map)
tuples.concat new_tuple
}
- tuples
- end
-
- def generate_tuples_single(tag, es)
- tuples = []
- es.each {|time, record|
- timeout_block(tag, time, record){
- case @mode
- when "tuple"
- new_tuple = eval(@map)
- tuples << new_tuple
- when "each"
- new_tag = eval(@tag)
- new_time = eval(@time)
- new_record = eval(@record)
- tuples << [new_tag, new_time, new_record]
- end
- }
- }
- tuples
+ tuples
end
def timeout_block(tag, time, record)
begin
Timeout.timeout(@timeout){