#
# Fluentd ViaQ data model Filter Plugin
#
# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'time'
require 'date'
require 'fluent/filter'
require 'fluent/log'
require 'fluent/match'
require_relative 'filter_viaq_data_model_systemd'
begin
ViaqMatchClass = Fluent::Match
rescue
# Fluent::Match not provided with 0.14
class ViaqMatchClass
def initialize(pattern_str, unused)
patterns = pattern_str.split(/\s+/).map {|str|
Fluent::MatchPattern.create(str)
}
if patterns.length == 1
@pattern = patterns[0]
else
@pattern = Fluent::OrMatchPattern.new(patterns)
end
end
def match(tag)
@pattern.match(tag)
end
def to_s
"#{@pattern}"
end
end
end
module Fluent
class ViaqDataModelFilter < Filter
include ViaqDataModelFilterSystemd
Fluent::Plugin.register_filter('viaq_data_model', self)
desc 'Default list of comma-delimited fields to keep in each record'
config_param :default_keep_fields, default: [] do |val|
val.split(',')
end
desc 'Optional extra list of comma-delimited fields to keep in each record'
config_param :extra_keep_fields, default: [] do |val|
val.split(',')
end
# The kibana pod emits log records with an empty message field
# we want to preserve these empty messages
desc 'List of fields to keep as empty fields - also added to extra_keep_fields'
config_param :keep_empty_fields, default: ['message'] do |val|
val.split(',')
end
desc 'Use "undefined" field to store fields not in above lists'
config_param :use_undefined, :bool, default: false
desc 'Name of undefined field to store fields not in above lists if use_undefined is true'
config_param :undefined_name, :string, default: 'undefined'
# we can't directly add a field called @timestamp in a record_transform
# filter because the '@' is special to fluentd
desc 'Rename timestamp field to Elasticsearch compatible name'
config_param :rename_time, :bool, default: true
desc 'Rename timestamp field to Elasticsearch compatible name only if the destination field does not already exist'
config_param :rename_time_if_missing, :bool, default: false
desc 'Name of source timestamp field'
config_param :src_time_name, :string, default: 'time'
desc 'Name of destination timestamp field'
config_param :dest_time_name, :string, default: '@timestamp'
#
# type sys_journal
# tag "journal.system**"
# remove_keys log,stream,MESSAGE,_SOURCE_REALTIME_TIMESTAMP,__REALTIME_TIMESTAMP,CONTAINER_ID,CONTAINER_ID_FULL,CONTAINER_NAME,PRIORITY,_BOOT_ID,_CAP_EFFECTIVE,_CMDLINE,_COMM,_EXE,_GID,_HOSTNAME,_MACHINE_ID,_PID,_SELINUX_CONTEXT,_SYSTEMD_CGROUP,_SYSTEMD_SLICE,_SYSTEMD_UNIT,_TRANSPORT,_UID,_AUDIT_LOGINUID,_AUDIT_SESSION,_SYSTEMD_OWNER_UID,_SYSTEMD_SESSION,_SYSTEMD_USER_UNIT,CODE_FILE,CODE_FUNCTION,CODE_LINE,ERRNO,MESSAGE_ID,RESULT,UNIT,_KERNEL_DEVICE,_KERNEL_SUBSYSTEM,_UDEV_SYSNAME,_UDEV_DEVNODE,_UDEV_DEVLINK,SYSLOG_FACILITY,SYSLOG_IDENTIFIER,SYSLOG_PID
#
# formatters will be processed in the order specified, so make sure more specific matches
# come before more general matches
desc 'Formatters for common data model, for well known record types'
config_section :formatter, param_name: :formatters do
desc 'is this formatter enabled?'
config_param :enabled, :bool, default: true
desc 'one of the well known formatter types'
config_param :type, :enum, list: [:sys_journal, :k8s_journal, :sys_var_log, :k8s_json_file]
desc 'process records with this tag pattern'
config_param :tag, :string
desc 'remove these keys from the record - same as record_transformer "remove_keys" field'
config_param :remove_keys, :string, default: nil
end
desc 'Which part of the pipeline is this - collector, normalizer, etc. for pipeline_metadata'
config_param :pipeline_type, :enum, list: [:collector, :normalizer], default: :collector
# e.g.
#
# tag "journal.system** system.var.log** **_default_** **_openshift_** **_openshift-infra_** mux.ops"
# name_type operations_full
#
#
# tag "**"
# name_type project_full
#
# operations_full - ".operations.YYYY.MM.DD"
# operations_prefix - ".operations"
# project_full - "project.${kubernetes.namespace_name}.${kubernetes.namespace_id}.YYYY.MM.DD"
# project_prefix - "project.${kubernetes.namespace_name}.${kubernetes.namespace_id}"
# index names will be processed in the order specified, so make sure more specific matches
# come before more general matches e.g. make sure tag "**" is last
desc 'Construct Elasticsearch index names or prefixes based on the matching tags pattern and type'
config_section :elasticsearch_index_name, param_name: :elasticsearch_index_names do
desc 'is this index name enabled?'
config_param :enabled, :bool, default: true
desc 'create index names for records with this tag pattern'
config_param :tag, :string
desc 'type of index name to create'
config_param :name_type, :enum, list: [:operations_full, :project_full, :operations_prefix, :project_prefix]
end
desc 'Store the Elasticsearch index name in this field'
config_param :elasticsearch_index_name_field, :string, default: 'viaq_index_name'
desc 'Store the Elasticsearch index prefix in this field'
config_param :elasticsearch_index_prefix_field, :string, default: 'viaq_index_prefix'
def configure(conf)
super
@keep_fields = {}
@default_keep_fields.each{|xx| @keep_fields[xx] = true}
@extra_keep_fields.each{|xx| @keep_fields[xx] = true}
@keep_empty_fields_hash = {}
@keep_empty_fields.each do |xx|
@keep_empty_fields_hash[xx] = true
@keep_fields[xx] = true
end
if @use_undefined && @keep_fields.key?(@undefined_name)
raise Fluent::ConfigError, "Do not put [#{@undefined_name}] in default_keep_fields or extra_keep_fields"
end
if (@rename_time || @rename_time_if_not_exist) && @use_undefined && !@keep_fields.key?(@src_time_name)
raise Fluent::ConfigError, "Field [#{@src_time_name}] must be listed in default_keep_fields or extra_keep_fields"
end
if @formatters
@formatters.each do |fmtr|
matcher = ViaqMatchClass.new(fmtr.tag, nil)
fmtr.instance_eval{ @params[:matcher] = matcher }
fmtr.instance_eval{ @params[:fmtr_type] = fmtr.type }
if fmtr.remove_keys
fmtr.instance_eval{ @params[:fmtr_remove_keys] = fmtr.remove_keys.split(',') }
else
fmtr.instance_eval{ @params[:fmtr_remove_keys] = nil }
end
case fmtr.type
when :sys_journal, :k8s_journal
fmtr_func = method(:process_journal_fields)
when :sys_var_log
fmtr_func = method(:process_sys_var_log_fields)
when :k8s_json_file
fmtr_func = method(:process_k8s_json_file_fields)
end
fmtr.instance_eval{ @params[:fmtr_func] = fmtr_func }
end
@formatter_cache = {}
@formatter_cache_nomatch = {}
end
begin
@docker_hostname = File.open('/etc/docker-hostname') { |f| f.readline }.rstrip
rescue
@docker_hostname = nil
end
@ipaddr4 = ENV['IPADDR4'] || '127.0.0.1'
@ipaddr6 = ENV['IPADDR6'] || '::1'
@pipeline_version = (ENV['FLUENTD_VERSION'] || 'unknown fluentd version') + ' ' + (ENV['DATA_VERSION'] || 'unknown data version')
# create the elasticsearch index name tag matchers
unless @elasticsearch_index_names.empty?
@elasticsearch_index_names.each do |ein|
matcher = ViaqMatchClass.new(ein.tag, nil)
ein.instance_eval{ @params[:matcher] = matcher }
end
end
end
def start
super
end
def shutdown
super
end
# if thing doesn't respond to empty? then assume it isn't empty e.g.
# 0.respond_to?(:empty?) == false - the FixNum 0 is not empty
def isempty(thing)
thing.respond_to?(:empty?) && thing.empty?
end
# recursively delete empty fields and empty lists/hashes from thing
def delempty(thing)
if thing.respond_to?(:delete_if)
if thing.kind_of? Hash
thing.delete_if{|k,v| v.nil? || isempty(delempty(v)) || isempty(v)}
else # assume single element iterable
thing.delete_if{|elem| elem.nil? || isempty(delempty(elem)) || isempty(elem)}
end
end
thing
end
def process_sys_var_log_fields(tag, time, record, fmtr_type = nil)
record['systemd'] = {"t" => {"PID" => record['pid']}, "u" => {"SYSLOG_IDENTIFIER" => record['ident']}}
unless record[@dest_time_name] # e.g. already has @timestamp
rectime = record['time'] || time
# handle the case where the time reported in /var/log/messages is for a previous year
if Time.at(rectime) > Time.now
record['time'] = Time.new((rectime.year - 1), rectime.month, rectime.day, rectime.hour, rectime.min, rectime.sec, rectime.utc_offset).utc.to_datetime.rfc3339(6)
else
record['time'] = rectime.utc.to_datetime.rfc3339(6)
end
end
if record['host'].eql?('localhost') && @docker_hostname
record['hostname'] = @docker_hostname
else
record['hostname'] = record['host']
end
end
def process_k8s_json_file_fields(tag, time, record, fmtr_type = nil)
record['message'] = record['message'] || record['log']
record['level'] = (record['stream'] == 'stdout') ? 'info' : 'err'
if record['kubernetes'] && record['kubernetes']['host']
record['hostname'] = record['kubernetes']['host']
elsif @docker_hostname
record['hostname'] = @docker_hostname
end
unless record[@dest_time_name] # e.g. already has @timestamp
record['time'] = record['time'].utc.to_datetime.rfc3339(6)
end
end
def check_for_match_and_format(tag, time, record)
return unless @formatters
return if @formatter_cache_nomatch[tag]
fmtr = @formatter_cache[tag]
unless fmtr
idx = @formatters.index{|fmtr| fmtr.matcher.match(tag)}
if idx && (fmtr = @formatters[idx]).enabled
@formatter_cache[tag] = fmtr
else
@formatter_cache_nomatch[tag] = true
return
end
end
fmtr.fmtr_func.call(tag, time, record, fmtr.fmtr_type)
if record[@dest_time_name].nil? && record['time'].nil?
record['time'] = Time.at(time).utc.to_datetime.rfc3339(6)
end
if fmtr.fmtr_remove_keys
fmtr.fmtr_remove_keys.each{|k| record.delete(k)}
end
end
def add_pipeline_metadata (tag, time, record)
(record['pipeline_metadata'] ||= {})[@pipeline_type.to_s] = {
"ipaddr4" => @ipaddr4,
"ipaddr6" => @ipaddr6,
"inputname" => "fluent-plugin-systemd",
"name" => "fluentd",
"received_at" => Time.now.utc.to_datetime.rfc3339(6),
"version" => @pipeline_version
}
end
def add_elasticsearch_index_name_field(tag, time, record)
found = false
@elasticsearch_index_names.each do |ein|
if ein.matcher.match(tag)
found = true
return unless ein.enabled
if ein.name_type == :operations_full || ein.name_type == :project_full
field_name = @elasticsearch_index_name_field
need_time = true
else
field_name = @elasticsearch_index_prefix_field
need_time = false
end
case ein.name_type
when :operations_full, :operations_prefix
prefix = ".operations"
when :project_full, :project_prefix
if (k8s = record['kubernetes']).nil?
log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes field: #{record}")
break
elsif (name = k8s['namespace_name']).nil?
log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_name field: #{record}")
break
elsif (uuid = k8s['namespace_id']).nil?
log.error("record cannot use elasticsearch index name type #{ein.name_type}: record is missing kubernetes.namespace_id field: #{record}")
break
else
prefix = "project." + name + "." + uuid
end
end
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("prefix #{prefix} need_time #{need_time} time #{record[@dest_time_name]}")
end
end
if need_time
ts = DateTime.parse(record[@dest_time_name])
record[field_name] = prefix + "." + ts.strftime("%Y.%m.%d")
else
record[field_name] = prefix
end
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("record[#{field_name}] = #{record[field_name]}")
end
end
break
end
end
unless found
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("no match for tag #{tag}")
end
end
end
end
def filter(tag, time, record)
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("input #{time} #{tag} #{record}")
end
end
check_for_match_and_format(tag, time, record)
add_pipeline_metadata(tag, time, record)
if @use_undefined
# undefined contains all of the fields not in keep_fields
undefined = record.reject{|k,v| @keep_fields.key?(k)}
# only set the undefined field if there are undefined fields
unless undefined.empty?
record[@undefined_name] = undefined
# remove the undefined fields from the record top level
record.delete_if{|k,v| undefined.key?(k)}
end
end
# remove the field from record if it is not in the list of fields to keep and
# it is empty
record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))}
# probably shouldn't remove everything . . .
log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty?
# rename the time field
if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name)
val = record.delete(@src_time_name)
unless @rename_time_if_missing && record.key?(@dest_time_name)
record[@dest_time_name] = val
end
end
if !@elasticsearch_index_names.empty?
add_elasticsearch_index_name_field(tag, time, record)
elsif ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("not adding elasticsearch index name or prefix")
end
end
if ENV['CDM_DEBUG']
unless tag == ENV['CDM_DEBUG_IGNORE_TAG']
log.error("output #{time} #{tag} #{record}")
end
end
record
end
end
end