lib/fluent/plugin/gelf_plugin_util.rb in fluent-plugin-gelf-best-1.2.0 vs lib/fluent/plugin/gelf_plugin_util.rb in fluent-plugin-gelf-best-1.3.0
- old
+ new
@@ -1,34 +1,56 @@
+require 'oj'
+require 'date'
+require 'gelf'
+
module Fluent
module GelfPluginUtil
- require 'gelf'
LEVEL_MAP = {
- "0" => GELF::UNKNOWN, "1" => GELF::UNKNOWN, "a" => GELF::UNKNOWN,
- "2" => GELF::FATAL, "c" => GELF::FATAL,
- "3" => GELF::ERROR,
- "4" => GELF::WARN, "w" => GELF::WARN,
- "5" => GELF::INFO, "n" => GELF::INFO,
- "6" => GELF::INFO, "i" => GELF::INFO,
- "7" => GELF::DEBUG, "d" => GELF::DEBUG,
- "e" => GELF::ERROR # assuming 'e' stands typically for 'error'
- }
+ '0' => GELF::UNKNOWN, '1' => GELF::UNKNOWN, 'a' => GELF::UNKNOWN,
+ '2' => GELF::FATAL, 'c' => GELF::FATAL,
+ '3' => GELF::ERROR,
+ '4' => GELF::WARN, 'w' => GELF::WARN,
+ '5' => GELF::INFO, 'n' => GELF::INFO,
+ '6' => GELF::INFO, 'i' => GELF::INFO,
+ '7' => GELF::DEBUG, 'd' => GELF::DEBUG,
+ 'e' => GELF::ERROR
+ }.freeze
+ def get_internal_json_out(record, key)
+ json_data = parse_json_field(record[key])
+ return record unless json_data
+
+ record.delete(key)
+ merge_record_with_json(record, json_data)
+ rescue Oj::ParseError, EncodingError
+ # Return original record if JSON parsing fails
+ record
+ end
+
def make_gelfentry(tag, time, record, conf = {})
- gelfentry = {"_fluentd_tag" => tag}
- gelfentry["timestamp"] = calculate_timestamp(time)
+ record = get_internal_json_out(record, "message")
+ record = get_internal_json_out(record, "log")
+ gelfentry = {"_fluentd_tag" => tag, "timestamp" => calculate_timestamp(time)}
- record.each_pair do |k, v|
- gelfentry.merge!(process_record_entry(k, v, conf, gelfentry))
+ record.each do |k, v|
+ process_record_entry(k, v, conf, gelfentry)
end
ensure_short_message(gelfentry)
gelfentry.compact
end
private
+ def parse_json_field(json_field)
+ return nil unless json_field
+ json = Oj.load(json_field.strip)
+ return nil unless json.is_a?(Hash)
+ json
+ end
+
def calculate_timestamp(time)
if defined?(Fluent::EventTime) && time.is_a?(Fluent::EventTime)
time.sec + (time.nsec.to_f / 1_000_000_000).round(3)
else
time
@@ -36,45 +58,31 @@
end
def process_record_entry(k, v, conf, gelfentry)
case k
when 'host', 'hostname'
- return {'host' => (conf[:use_record_host] ? v : gelfentry['_host'] = v)}
+ gelfentry['host'] = conf[:use_record_host] ? v : (gelfentry['_host'] = v)
when 'timestamp', 'time'
- { 'timestamp' => parse_timestamp(v) }
+ gelfentry['timestamp'] = parse_timestamp(v)
when 'level'
- {'level' => LEVEL_MAP[v.to_s.downcase[0]] || (v.to_s.length >= 2 && v.to_s.downcase[1] != "r" ? GELF::UNKNOWN : v)}
+ gelfentry['level'] = LEVEL_MAP[v.to_s.downcase[0]] || GELF::UNKNOWN
when 'msec'
- conf[:add_msec_time] ? {'timestamp' => "#{time.to_s}.#{v}".to_f} : {'_msec' => v}
+ gelfentry['timestamp'] = conf[:add_msec_time] ? "#{time.to_s}.#{v}".to_f : (gelfentry['_msec'] = v)
when 'short_message', 'version', 'full_message', 'facility', 'file', 'line'
- {k => v}
+ gelfentry[k] = v
else
- {k.start_with?('_') ? k : "_#{k}" => v}
+ gelfentry[k.start_with?('_') ? k : "_#{k}"] = v
end
end
def parse_timestamp(v)
- if v.is_a?(Integer) || v.is_a?(Float)
- v
- else
- begin
- (DateTime.parse(v).strftime("%Q").to_f / 1_000).round(3)
- rescue ArgumentError
- v
- end
- end
+ return v if v.is_a?(Integer) || v.is_a?(Float)
+
+ DateTime.parse(v).strftime("%Q").to_f / 1_000 rescue v
end
def ensure_short_message(gelfentry)
- return if gelfentry['short_message'] && !gelfentry['short_message'].to_s.strip.empty?
-
- ['_message', '_msg', '_log', '_record'].each do |key|
- if gelfentry[key] && !gelfentry[key].to_s.strip.empty?
- gelfentry['short_message'] = gelfentry.delete(key)
- return
- end
- end
-
- gelfentry['short_message'] = '(no message)' unless gelfentry['short_message']
+ default_key = ['_message', '_msg', '_log', '_record'].find { |key| gelfentry[key]&.strip&.empty? == false }
+ gelfentry['short_message'] = default_key ? gelfentry.delete(default_key) : '(no message)'
end
end
end