lib/fluent/plugin/parser.rb in fluentd-0.14.8 vs lib/fluent/plugin/parser.rb in fluentd-0.14.9
- old
+ new
@@ -29,103 +29,163 @@
class ParserError < StandardError; end
configured_in :parse
- # SET false BEFORE CONFIGURE, to return nil when time not parsed
- attr_accessor :estimate_current_event
+ ### types can be specified as string-based hash style
+ # field1:type, field2:type, field3:type:option, field4:type:option
+ ### or, JSON format
+ # {"field1":"type", "field2":"type", "field3":"type:option", "field4":"type:option"}
+ config_param :types, :hash, value_type: :string, default: nil
+ # available options are:
+ # array: (1st) delimiter
+ # time : type[, format, timezone] -> type should be a valid "time_type"(string/unixtime/float)
+ # : format[, timezone]
+
+ config_param :time_key, :string, default: nil
+ config_param :null_value_pattern, :string, default: nil
+ config_param :null_empty_string, :bool, default: false
+ config_param :estimate_current_event, :bool, default: true
config_param :keep_time_key, :bool, default: false
- def initialize
+ AVAILABLE_PARSER_VALUE_TYPES = ['string', 'integer', 'float', 'bool', 'time', 'array']
+
+ # for tests
+ attr_reader :type_converters
+
+ PARSER_TYPES = [:text_per_line, :text, :binary]
+ def parser_type
+ :text_per_line
+ end
+
+ def configure(conf)
super
- @estimate_current_event = true
+
+ @time_parser = time_parser_create
+ @null_value_regexp = @null_value_pattern && Regexp.new(@null_value_pattern)
+ @type_converters = build_type_converters(@types)
+ @execute_convert_values = @type_converters || @null_value_regexp || @null_empty_string
end
- def parse(text)
+ def parse(text, &block)
raise NotImplementedError, "Implement this method in child class"
end
def call(*a, &b)
# Keep backward compatibility for existing plugins
# TODO: warn when deprecated
parse(*a, &b)
end
- TimeParser = Fluent::TimeParser
- end
+ def implement?(feature)
+ methods_of_plugin = self.class.instance_methods(false)
+ case feature
+ when :parse_io then methods_of_plugin.include?(:parse_io)
+ when :parse_partial_data then methods_of_plugin.include?(:parse_partial_data)
+ else
+ raise ArgumentError, "Unknown feature for parser plugin: #{feature}"
+ end
+ end
- class ValuesParser < Parser
- include Fluent::TypeConverter
+ def parse_io(io, &block)
+ raise NotImplementedError, "Optional API #parse_io is not implemented"
+ end
- config_param :keys, :array, default: []
- config_param :time_key, :string, default: nil
- config_param :null_value_pattern, :string, default: nil
- config_param :null_empty_string, :bool, default: false
+ def parse_partial_data(data, &block)
+ raise NotImplementedError, "Optional API #parse_partial_data is not implemented"
+ end
- def configure(conf)
- super
-
- if @time_key && !@keys.include?(@time_key) && @estimate_current_event
- raise Fluent::ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})"
+ def parse_time(record)
+ if @time_key && record.respond_to?(:has_key?) && record.has_key?(@time_key)
+ src = if @keep_time_key
+ record[@time_key]
+ else
+ record.delete(@time_key)
+ end
+ @time_parser.parse(src)
+ elsif @estimate_current_event
+ Fluent::EventTime.now
+ else
+ nil
end
+ rescue Fluent::TimeParser::TimeParseError => e
+ raise ParserError, e.message
+ end
- if @time_format && !@time_key
- raise Fluent::ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}"
- end
+ # def parse(text, &block)
+ # time, record = convert_values(time, record)
+ # yield time, record
+ # end
+ def convert_values(time, record)
+ return time, record unless @execute_convert_values
- @time_parser = time_parser_create
+ record.each_key do |key|
+ value = record[key]
+ next unless value # nil/null value is always left as-is.
- if @null_value_pattern
- @null_value_pattern = Regexp.new(@null_value_pattern)
+ if value.is_a?(String) && string_like_null(value)
+ record[key] = nil
+ next
+ end
+
+ if @type_converters && @type_converters.has_key?(key)
+ record[key] = @type_converters[key].call(value)
+ end
end
- @mutex = Mutex.new
+ return time, record
end
- def values_map(values)
- record = Hash[keys.zip(values.map { |value| convert_value_to_nil(value) })]
+ def string_like_null(value, null_empty_string = @null_empty_string, null_value_regexp = @null_value_regexp)
+ null_empty_string && value.empty? || null_value_regexp && string_safe_encoding(value){|s| null_value_regexp.match(s) }
+ end
- if @time_key
- value = @keep_time_key ? record[@time_key] : record.delete(@time_key)
- time = if value.nil?
- if @estimate_current_event
- Fluent::EventTime.now
- else
- nil
- end
- else
- @mutex.synchronize { @time_parser.parse(value) }
- end
- elsif @estimate_current_event
- time = Fluent::EventTime.now
- else
- time = nil
- end
+ TRUTHY_VALUES = ['true', 'yes', '1']
- convert_field_type!(record) if @type_converters
+ def build_type_converters(types)
+ return nil unless types
- return time, record
- end
+ converters = {}
- private
-
- def convert_field_type!(record)
- @type_converters.each_key { |key|
- if value = record[key]
- record[key] = convert_type(key, value)
+ types.each_pair do |field_name, type_definition|
+ type, option = type_definition.split(":", 2)
+ unless AVAILABLE_PARSER_VALUE_TYPES.include?(type)
+ raise Fluent::ConfigError, "unknown value conversion for key:'#{field_name}', type:'#{type}'"
end
- }
- end
- def convert_value_to_nil(value)
- if value and @null_empty_string
- value = (value == '') ? nil : value
+ conv = case type
+ when 'string' then ->(v){ v.to_s }
+ when 'integer' then ->(v){ v.to_i rescue v.to_s.to_i }
+ when 'float' then ->(v){ v.to_f rescue v.to_s.to_f }
+ when 'bool' then ->(v){ TRUTHY_VALUES.include?(v.to_s.downcase) }
+ when 'time'
+ # comma-separated: time:[timezone:]time_format
+ # time_format is unixtime/float/string-time-format
+ timep = if option
+ time_type = 'string' # estimate
+ timezone, time_format = option.split(':', 2)
+ unless Fluent::Timezone.validate(timezone)
+ timezone, time_format = nil, option
+ end
+ if Fluent::TimeMixin::TIME_TYPES.include?(time_format)
+ time_type, time_format = time_format, nil # unixtime/float
+ end
+ time_parser_create(type: time_type.to_sym, format: time_format, timezone: timezone)
+ else
+ time_parser_create(type: :string, format: nil, timezone: nil)
+ end
+ ->(v){ timep.parse(v) rescue nil }
+ when 'array'
+ delimiter = option ? option.to_s : ','
+ ->(v){ string_safe_encoding(v.to_s){|s| s.split(delimiter) } }
+ else
+ raise "BUG: unknown type even after check: #{type}"
+ end
+ converters[field_name] = conv
end
- if value and @null_value_pattern
- value = ::Fluent::StringUtil.match_regexp(@null_value_pattern, value) ? nil : value
- end
- value
+
+ converters
end
end
end
end