# # Fluent # # Copyright (C) 2011 FURUHASHI Sadayuki # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent require 'fluent/registry' class ParserError < StandardError end class Parser include Configurable # SET false BEFORE CONFIGURE, to return nil when time not parsed # 'configure()' may raise errors for unexpected configurations attr_accessor :estimate_current_event def initialize super @estimate_current_event = true end def configure(conf) super end def parse(text) raise NotImplementedError, "Implement this method in child class" end # Keep backward compatibility for existing plugins def call(*a, &b) parse(*a, &b) end end class TextParser # Keep backward compatibility for existing plugins ParserError = ::Fluent::ParserError class TimeParser def initialize(time_format) @cache1_key = nil @cache1_time = nil @cache2_key = nil @cache2_time = nil @parser = if time_format Proc.new { |value| Time.strptime(value, time_format) } else Time.method(:parse) end end def parse(value) unless value.is_a?(String) raise ParserError, "value must be string: #{value}" end if @cache1_key == value return @cache1_time elsif @cache2_key == value return @cache2_time else begin time = @parser.call(value).to_i rescue => e raise ParserError, "invalid time format: value = #{value}, error_class = #{e.class.name}, error = #{e.message}" end @cache1_key = @cache2_key @cache1_time = @cache2_time @cache2_key = value @cache2_time = time return time end end end module TypeConverter Converters = { 'string' => lambda { |v| v.to_s }, 'integer' => lambda { |v| v.to_i }, 'float' => lambda { |v| v.to_f }, 'bool' => lambda { |v| case v.downcase when 'true', 'yes', '1' true else false end }, 'time' => lambda { |v, time_parser| time_parser.parse(v) }, 'array' => lambda { |v, delimiter| v.to_s.split(delimiter) } } def self.included(klass) klass.instance_eval { config_param :types, :string, :default => nil config_param :types_delimiter, :string, :default => ',' config_param :types_label_delimiter, :string, :default => ':' } end def configure(conf) super @type_converters = parse_types_parameter unless @types.nil? end private def convert_type(name, value) converter = @type_converters[name] converter.nil? ? value : converter.call(value) end def parse_types_parameter converters = {} @types.split(@types_delimiter).each { |pattern_name| name, type, format = pattern_name.split(@types_label_delimiter, 3) raise ConfigError, "Type is needed" if type.nil? case type when 'time' t_parser = TimeParser.new(format) converters[name] = lambda { |v| Converters[type].call(v, t_parser) } when 'array' delimiter = format || ',' converters[name] = lambda { |v| Converters[type].call(v, delimiter) } else converters[name] = Converters[type] end } converters end end class RegexpParser < Parser include TypeConverter config_param :time_format, :string, :default => nil def initialize(regexp, conf={}) super() @regexp = regexp unless conf.empty? configure(conf) end @time_parser = TimeParser.new(@time_format) @mutex = Mutex.new end def configure(conf) super @time_parser = TimeParser.new(@time_format) end def patterns {'format' => @regexp, 'time_format' => @time_format} end def parse(text) m = @regexp.match(text) unless m if block_given? yield nil, nil return else return nil, nil end end time = nil record = {} m.names.each {|name| if value = m[name] case name when "time" time = @mutex.synchronize { @time_parser.parse(value) } else record[name] = if @type_converters.nil? value else convert_type(name, value) end end end } if @estimate_current_event time ||= Engine.now end if block_given? yield time, record else # keep backward compatibility. will be removed at v1 return time, record end end end class JSONParser < Parser config_param :time_key, :string, :default => 'time' config_param :time_format, :string, :default => nil def configure(conf) super unless @time_format.nil? @time_parser = TimeParser.new(@time_format) @mutex = Mutex.new end end def parse(text) record = Yajl.load(text) if value = record.delete(@time_key) if @time_format time = @mutex.synchronize { @time_parser.parse(value) } else begin time = value.to_i rescue => e raise ParserError, "invalid time value: value = #{value}, error_class = #{e.class.name}, error = #{e.message}" end end else if @estimate_current_event time = Engine.now else time = nil end end if block_given? yield time, record else return time, record end rescue Yajl::ParseError if block_given? yield nil, nil else return nil, nil end end end class ValuesParser < Parser include TypeConverter config_param :keys, :string config_param :time_key, :string, :default => nil config_param :time_format, :string, :default => nil def configure(conf) super @keys = @keys.split(",") if @time_key && !@keys.include?(@time_key) && @estimate_current_event raise ConfigError, "time_key (#{@time_key.inspect}) is not included in keys (#{@keys.inspect})" end if @time_format && !@time_key raise ConfigError, "time_format parameter is ignored because time_key parameter is not set. at #{conf.inspect}" end @time_parser = TimeParser.new(@time_format) @mutex = Mutex.new end def values_map(values) record = Hash[keys.zip(values)] if @time_key value = record.delete(@time_key) time = if value.nil? if @estimate_current_event Engine.now else nil end else @mutex.synchronize { @time_parser.parse(value) } end elsif @estimate_current_event time = Engine.now else time = nil end convert_field_type!(record) if @type_converters return time, record end private def convert_field_type!(record) @type_converters.each_key { |key| if value = record[key] record[key] = convert_type(key, value) end } end end class TSVParser < ValuesParser config_param :delimiter, :string, :default => "\t" def configure(conf) super @key_num = @keys.length end def parse(text) if block_given? yield values_map(text.split(@delimiter, @key_num)) else return values_map(text.split(@delimiter, @key_num)) end end end class LabeledTSVParser < ValuesParser config_param :delimiter, :string, :default => "\t" config_param :label_delimiter, :string, :default => ":" config_param :time_key, :string, :default => "time" def configure(conf) conf['keys'] = conf['time_key'] || 'time' super(conf) end def parse(text) @keys = [] values = [] text.split(delimiter).each do |pair| key, value = pair.split(label_delimiter, 2) @keys.push(key) values.push(value) end if block_given? yield values_map(values) else return values_map(values) end end end class CSVParser < ValuesParser def initialize super require 'csv' end def parse(text) if block_given? yield values_map(CSV.parse_line(text)) else return values_map(CSV.parse_line(text)) end end end class NoneParser < Parser config_param :message_key, :string, :default => 'message' def parse(text) record = {} record[@message_key] = text time = @estimate_current_event ? Engine.now : nil if block_given? yield time, record else return time, record end end end class ApacheParser < Parser REGEXP = /^(?[^ ]*) [^ ]* (?[^ ]*) \[(?