lib/fluent/plugin/parser.rb in fluentd-0.14.8 vs lib/fluent/plugin/parser.rb in fluentd-0.14.9

- old
+ new

@@ -29,103 +29,163 @@ class ParserError < StandardError; end configured_in :parse - # SET false BEFORE CONFIGURE, to return nil when time not parsed - attr_accessor :estimate_current_event + ### types can be specified as string-based hash style + # field1:type, field2:type, field3:type:option, field4:type:option + ### or, JSON format + # {"field1":"type", "field2":"type", "field3":"type:option", "field4":"type:option"} + config_param :types, :hash, value_type: :string, default: nil + # available options are: + # array: (1st) delimiter + # time : type[, format, timezone] -> type should be a valid "time_type"(string/unixtime/float) + # : format[, timezone] + + config_param :time_key, :string, default: nil + config_param :null_value_pattern, :string, default: nil + config_param :null_empty_string, :bool, default: false + config_param :estimate_current_event, :bool, default: true config_param :keep_time_key, :bool, default: false - def initialize + AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array'] + + # for tests + attr_reader :type_converters + + PARSER_TYPES = [:text_per_line, :text, :binary] + def parser_type + :text_per_line + end + + def configure(conf) super - @estimate_current_event = true + + @time_parser = time_parser_create + @null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern) + @type_converters = build_type_converters(@types) + @execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string end - def parse(text) + def parse(text, &block) raise NotImplementedError, "Implement this method in child class" end def call(*a, &b) # Keep backward compatibility for existing plugins # TODO: warn when deprecated parse(*a, &b) end - TimeParser = Fluent::TimeParser - end + def implement?(feature) + methods_of_plugin = self.class.instance_methods(false) + case feature + when :parse_io then methods_of_plugin.include?(:parse_io) + when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data) + else + raise ArgumentError, "Unknown feature for parser plugin: #{feature}" + end + end - class ValuesParser < Parser - include Fluent::TypeConverter + def parse_io(io, &block) + raise NotImplementedError, "Optional API #parse_io is not implemented" + end - config_param :keys, :array, default: [] - config_param :time_key, :string, default: nil - config_param :null_value_pattern, :string, default: nil - config_param :null_empty_string, :bool, default: false + def parse_partial_data(data, &block) + raise NotImplementedError, "Optional API #parse_partial_data is not implemented" + end - def configure(conf) - super - - if @time_key && !@keys.include?(@time_key) && @estimate_current_event - raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})" + def parse_time(record) + if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key) + src = if @keep_time_key + record[@time_key] + else + record.delete(@time_key) + end + @time_parser.parse(src) + elsif @estimate_current_event + Fluent::EventTime.now + else + nil end + rescue Fluent::TimeParser::TimeParseError => e + raise ParserError, e.message + end - if @time_format && !@time_key - raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}" - end + # def parse(text, &block) + # time, record = convert_values(time, record) + # yield time, record + # end + def convert_values(time, record) + return time, record unless @execute_convert_values - @time_parser = time_parser_create + record.each_key do |key| + value = record[key] + next unless value # nil/null value is always left as-is. - if @null_value_pattern - @null_value_pattern = Regexp.new(@null_value_pattern) + if value.is_a?(String) && string_like_null(value) + record[key] = nil + next + end + + if @type_converters && @type_converters.has_key?(key) + record[key] = @type_converters[key].call(value) + end end - @mutex = Mutex.new + return time, record end - def values_map(values) - record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })] + def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp) + null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) } + end - if @time_key - value = @keep_time_key ? record[@time_key] : record.delete(@time_key) - time = if value.nil? - if @estimate_current_event - Fluent::EventTime.now - else - nil - end - else - @mutex.synchronize { @time_parser.parse(value) } - end - elsif @estimate_current_event - time = Fluent::EventTime.now - else - time = nil - end + TRUTHY_VALUES = ['true', 'yes', '1'] - convert_field_type!(record) if @type_converters + def build_type_converters(types) + return nil unless types - return time, record - end + converters = {} - private - - def convert_field_type!(record) - @type_converters.each_key { |key| - if value = record[key] - record[key] = convert_type(key, value) + types.each_pair do |field_name, type_definition| + type, option = type_definition.split(":", 2) + unless AVAILABLE_PARSER_VALUE_TYPES.include?(type) + raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'" end - } - end - def convert_value_to_nil(value) - if value and @null_empty_string - value = (value == '') ? nil : value + conv = case type + when 'string' then ->(v){ v.to_s } + when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i } + when 'float' then ->(v){ v.to_f rescue v.to_s.to_f } + when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) } + when 'time' + # comma-separated: time:[timezone:]time_format + # time_format is unixtime/float/string-time-format + timep = if option + time_type = 'string' # estimate + timezone, time_format = option.split(':', 2) + unless Fluent::Timezone.validate(timezone) + timezone, time_format = nil, option + end + if Fluent::TimeMixin::TIME_TYPES.include?(time_format) + time_type, time_format = time_format, nil # unixtime/float + end + time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone) + else + time_parser_create(type: :string, format: nil, timezone: nil) + end + ->(v){ timep.parse(v) rescue nil } + when 'array' + delimiter = option ? option.to_s : ',' + ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } } + else + raise "BUG: unknown type even after check: #{type}" + end + converters[field_name] = conv end - if value and @null_value_pattern - value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value - end - value + + converters end end end end