# # Fluentd ViaQ data model Filter Plugin # # Copyright 2017 Red Hat, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/filter' require 'fluent/log' module Fluent class ViaqDataModelFilter < Filter Fluent::Plugin.register_filter('viaq_data_model', self) desc 'Default list of comma-delimited fields to keep in each record' config_param :default_keep_fields, default: [] do |val| val.split(',') end desc 'Optional extra list of comma-delimited fields to keep in each record' config_param :extra_keep_fields, default: [] do |val| val.split(',') end # The kibana pod emits log records with an empty message field # we want to preserve these empty messages desc 'List of fields to keep as empty fields - also added to extra_keep_fields' config_param :keep_empty_fields, default: ['message'] do |val| val.split(',') end desc 'Use "undefined" field to store fields not in above lists' config_param :use_undefined, :bool, default: false desc 'Name of undefined field to store fields not in above lists if use_undefined is true' config_param :undefined_name, :string, default: 'undefined' # we can't directly add a field called @timestamp in a record_transform # filter because the '@' is special to fluentd desc 'Rename timestamp field to Elasticsearch compatible name' config_param :rename_time, :bool, default: true desc 'Rename timestamp field to Elasticsearch compatible name only if the destination field does not already exist' config_param :rename_time_if_missing, :bool, default: false desc 'Name of source timestamp field' config_param :src_time_name, :string, default: 'time' desc 'Name of destination timestamp field' config_param :dest_time_name, :string, default: '@timestamp' def configure(conf) super @keep_fields = {} @default_keep_fields.each{|xx| @keep_fields[xx] = true} @extra_keep_fields.each{|xx| @keep_fields[xx] = true} @keep_empty_fields_hash = {} @keep_empty_fields.each do |xx| @keep_empty_fields_hash[xx] = true @keep_fields[xx] = true end if @use_undefined && @keep_fields.key?(@undefined_name) raise Fluent::ConfigError, "Do not put [#{@undefined_name}] in default_keep_fields or extra_keep_fields" end if (@rename_time || @rename_time_if_not_exist) && @use_undefined && !@keep_fields.key?(@src_time_name) raise Fluent::ConfigError, "Field [#{@src_time_name}] must be listed in default_keep_fields or extra_keep_fields" end end def start super end def shutdown super end # if thing doesn't respond to empty? then assume it isn't empty e.g. # 0.respond_to?(:empty?) == false - the FixNum 0 is not empty def isempty(thing) thing.respond_to?(:empty?) && thing.empty? end # recursively delete empty fields and empty lists/hashes from thing def delempty(thing) if thing.respond_to?(:delete_if) if thing.kind_of? Hash thing.delete_if{|k,v| v.nil? || isempty(delempty(v)) || isempty(v)} else # assume single element iterable thing.delete_if{|elem| elem.nil? || isempty(delempty(elem)) || isempty(elem)} end end thing end def filter(tag, time, record) if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("input #{time} #{tag} #{record}") end end if @use_undefined # undefined contains all of the fields not in keep_fields undefined = record.reject{|k,v| @keep_fields.key?(k)} # only set the undefined field if there are undefined fields unless undefined.empty? record[@undefined_name] = undefined # remove the undefined fields from the record top level record.delete_if{|k,v| undefined.key?(k)} end end # remove the field from record if it is not in the list of fields to keep and # it is empty record.delete_if{|k,v| !@keep_empty_fields_hash.key?(k) && (v.nil? || isempty(delempty(v)) || isempty(v))} # probably shouldn't remove everything . . . log.warn("Empty record! tag [#{tag}] time [#{time}]") if record.empty? # rename the time field if (@rename_time || @rename_time_if_missing) && record.key?(@src_time_name) val = record.delete(@src_time_name) unless @rename_time_if_missing && record.key?(@dest_time_name) record[@dest_time_name] = val end end if ENV['CDM_DEBUG'] unless tag == ENV['CDM_DEBUG_IGNORE_TAG'] log.error("output #{time} #{tag} #{record}") end end record end end end