lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.4 vs lib/fluent/plugin/out_map.rb in fluent-plugin-map-0.0.5

- old
+ new

@@ -8,26 +8,95 @@ config_param :key, :string, :default => nil #deprected config_param :time, :string, :default => nil config_param :record, :string, :default => nil config_param :multi, :bool, :default => false config_param :timeout, :time, :default => 1 + config_param :format, :string, :default => nil + MMAP_MAX_NUM = 50 + def configure(conf) super - if @map - $log.debug { "map: #{@map}" } - @mode = "tuple" + @format = determine_format() + configure_format() + @map = create_map(conf) + end + + def determine_format() + if @format + @format + elsif @map + "map" elsif (@tag || @key) && @time && @record + "record" + else + raise ConfigError, "Any of map, 3 parameters(key, time, and record) or format is required " + end + end + + def configure_format() + case @format + when "map" + # pass + when "record" @tag ||= @key raise ConfigError, "multi and 3 parameters(tag, time, and record) are not compatible" if @multi - $log.debug { "tag: #{@tag}, time: #{@time}, record: #{@record}" } - @mode = "each" + when "multimap" + # pass. else - raise ConfigError, "Either map or 3 parameters(key, time, and record) is required " + raise ConfigError, "format #{@format} is invalid." end end + def create_map(conf) + # return string like double array. + case @format + when "map" + parse_map() + when "record" + "[[#{@tag}, #{@time}, #{@record}]]" + when "multimap" + parse_multimap(conf) + end + end + + def parse_map() + if @multi + @map + else + "[#{@map}]" + end + end + + def parse_multimap(conf) + check_mmap_range(conf) + + prev_mmap = nil + result_mmaps = (1..MMAP_MAX_NUM).map { |i| + mmap = conf["mmap#{i}"] + if (i > 1) && prev_mmap.nil? && !mmap.nil? + raise ConfigError, "Jump of mmap index found. mmap#{i - 1} is missing." + end + prev_mmap = mmap + next if mmap.nil? + + mmap + }.compact.join(',') + "[#{result_mmaps}]" + end + + def check_mmap_range(conf) + invalid_mmap = conf.keys.select { |k| + m = k.match(/^mmap(\d+)$/) + m ? !((1..MMAP_MAX_NUM).include?(m[1].to_i)) : false + } + unless invalid_mmap.empty? + raise ConfigError, "Invalid mmapN found. N should be 1 - #{MMAP_MAX_NUM}: " + invalid_mmap.join(",") + end + end + + def emit(tag, es, chain) begin tag_output_es = do_map(tag, es) tag_output_es.each_pair do |tag, output_es| Fluent::Engine::emit_stream(tag, output_es) @@ -40,15 +109,12 @@ e #for test end end def do_map(tag, es) - tuples = if @multi - generate_tuples_multi(tag, es) - else - generate_tuples_single(tag, es) - end + tuples = generate_tuples(tag, es) + tag_output_es = Hash.new{|h, key| h[key] = MultiEventStream::new} tuples.each do |tag, time, record| if time == nil || record == nil raise SyntaxError.new end @@ -56,35 +122,16 @@ $log.trace { [tag, time, record].inspect } end tag_output_es end - def generate_tuples_multi(tag, es) + def generate_tuples(tag, es) tuples = [] es.each {|time, record| new_tuple = eval(@map) tuples.concat new_tuple } - tuples - end - - def generate_tuples_single(tag, es) - tuples = [] - es.each {|time, record| - timeout_block(tag, time, record){ - case @mode - when "tuple" - new_tuple = eval(@map) - tuples << new_tuple - when "each" - new_tag = eval(@tag) - new_time = eval(@time) - new_record = eval(@record) - tuples << [new_tag, new_time, new_record] - end - } - } - tuples + tuples end def timeout_block(tag, time, record) begin Timeout.timeout(@timeout){