lib/fluent/plugin/gelf_plugin_util.rb in fluent-plugin-gelf-best-1.2.0 vs lib/fluent/plugin/gelf_plugin_util.rb in fluent-plugin-gelf-best-1.3.0

- old
+ new

@@ -1,34 +1,56 @@ +require 'oj' +require 'date' +require 'gelf' + module Fluent module GelfPluginUtil - require 'gelf' LEVEL_MAP = { - "0" => GELF::UNKNOWN, "1" => GELF::UNKNOWN, "a" => GELF::UNKNOWN, - "2" => GELF::FATAL, "c" => GELF::FATAL, - "3" => GELF::ERROR, - "4" => GELF::WARN, "w" => GELF::WARN, - "5" => GELF::INFO, "n" => GELF::INFO, - "6" => GELF::INFO, "i" => GELF::INFO, - "7" => GELF::DEBUG, "d" => GELF::DEBUG, - "e" => GELF::ERROR # assuming 'e' stands typically for 'error' - } + '0' => GELF::UNKNOWN, '1' => GELF::UNKNOWN, 'a' => GELF::UNKNOWN, + '2' => GELF::FATAL, 'c' => GELF::FATAL, + '3' => GELF::ERROR, + '4' => GELF::WARN, 'w' => GELF::WARN, + '5' => GELF::INFO, 'n' => GELF::INFO, + '6' => GELF::INFO, 'i' => GELF::INFO, + '7' => GELF::DEBUG, 'd' => GELF::DEBUG, + 'e' => GELF::ERROR + }.freeze + def get_internal_json_out(record, key) + json_data = parse_json_field(record[key]) + return record unless json_data + + record.delete(key) + merge_record_with_json(record, json_data) + rescue Oj::ParseError, EncodingError + # Return original record if JSON parsing fails + record + end + def make_gelfentry(tag, time, record, conf = {}) - gelfentry = {"_fluentd_tag" => tag} - gelfentry["timestamp"] = calculate_timestamp(time) + record = get_internal_json_out(record, "message") + record = get_internal_json_out(record, "log") + gelfentry = {"_fluentd_tag" => tag, "timestamp" => calculate_timestamp(time)} - record.each_pair do |k, v| - gelfentry.merge!(process_record_entry(k, v, conf, gelfentry)) + record.each do |k, v| + process_record_entry(k, v, conf, gelfentry) end ensure_short_message(gelfentry) gelfentry.compact end private + def parse_json_field(json_field) + return nil unless json_field + json = Oj.load(json_field.strip) + return nil unless json.is_a?(Hash) + json + end + def calculate_timestamp(time) if defined?(Fluent::EventTime) && time.is_a?(Fluent::EventTime) time.sec + (time.nsec.to_f / 1_000_000_000).round(3) else time @@ -36,45 +58,31 @@ end def process_record_entry(k, v, conf, gelfentry) case k when 'host', 'hostname' - return {'host' => (conf[:use_record_host] ? v : gelfentry['_host'] = v)} + gelfentry['host'] = conf[:use_record_host] ? v : (gelfentry['_host'] = v) when 'timestamp', 'time' - { 'timestamp' => parse_timestamp(v) } + gelfentry['timestamp'] = parse_timestamp(v) when 'level' - {'level' => LEVEL_MAP[v.to_s.downcase[0]] || (v.to_s.length >= 2 && v.to_s.downcase[1] != "r" ? GELF::UNKNOWN : v)} + gelfentry['level'] = LEVEL_MAP[v.to_s.downcase[0]] || GELF::UNKNOWN when 'msec' - conf[:add_msec_time] ? {'timestamp' => "#{time.to_s}.#{v}".to_f} : {'_msec' => v} + gelfentry['timestamp'] = conf[:add_msec_time] ? "#{time.to_s}.#{v}".to_f : (gelfentry['_msec'] = v) when 'short_message', 'version', 'full_message', 'facility', 'file', 'line' - {k => v} + gelfentry[k] = v else - {k.start_with?('_') ? k : "_#{k}" => v} + gelfentry[k.start_with?('_') ? k : "_#{k}"] = v end end def parse_timestamp(v) - if v.is_a?(Integer) || v.is_a?(Float) - v - else - begin - (DateTime.parse(v).strftime("%Q").to_f / 1_000).round(3) - rescue ArgumentError - v - end - end + return v if v.is_a?(Integer) || v.is_a?(Float) + + DateTime.parse(v).strftime("%Q").to_f / 1_000 rescue v end def ensure_short_message(gelfentry) - return if gelfentry['short_message'] && !gelfentry['short_message'].to_s.strip.empty? - - ['_message', '_msg', '_log', '_record'].each do |key| - if gelfentry[key] && !gelfentry[key].to_s.strip.empty? - gelfentry['short_message'] = gelfentry.delete(key) - return - end - end - - gelfentry['short_message'] = '(no message)' unless gelfentry['short_message'] + default_key = ['_message', '_msg', '_log', '_record'].find { |key| gelfentry[key]&.strip&.empty? == false } + gelfentry['short_message'] = default_key ? gelfentry.delete(default_key) : '(no message)' end end end