# # Copyright 2020- wally # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "fluent/plugin/output" require 'socket' require 'pp' module Fluent module Plugin class ConditionCheckerOutput < Fluent::Plugin::Output Fluent::Plugin.register_output("condition_checker", self) helpers :event_emitter PATTERN_MAX_NUM = 20 config_param :remove_keys, :string, :default => nil, :desc => 'Specify record keys to be removed by a string separated by , (comma).' config_param :keep_keys, :string, :default => nil, :desc => 'Specify record keys to be kept by a string separated by , (comma).' config_param :renew_record, :bool, :default => false, :desc => 'Creates an output record newly without extending (merging) the input record fields.' config_param :renew_time_key, :string, :default => nil, :desc => 'Overwrites the time of events with a value of the record field.' # it should always be true config_param :enable_ruby, :bool, :default => true, # true for lower version compatibility :desc => 'Enable to use ruby codes in placeholders.' # it should always be true config_param :auto_typecast, :bool, :default => true, # false for lower version compatibility :desc => 'Automatically cast the field types.' # 調整中のconfig_param # これで、何個もconditionを設定して、そして処理を決めることができる。 config_param :tag1, :string, :desc => 'Specify the output tag name when the condition1 is true.' config_param :condition1, :string, :desc => 'Specify the condition1 to evaluate' (2..PATTERN_MAX_NUM).each do |i| config_param ('tag' + i.to_s).to_sym, :string, default: nil, desc: 'Specify tag'+i.to_s+' (not necessary)' config_param ('condition' + i.to_s).to_sym, :string, default: nil, # NAME REGEXP desc: 'Specify the condition'+i.to_s+' to evaluate (not necessary)' end config_param :tag_else, :string, :desc => 'Specify the output tag name when the no conditions are true.' BUILTIN_CONFIGURATIONS = %W(@id @type @label type output_tag remove_keys renew_record keep_keys enable_ruby renew_time_key auto_typecast tag_else record_else) def configure(conf) super # ここで、BUILTIN_CONFIGURATIONS に入っていないものがあった場合はerrorをraise conf.each_pair { |k, v| # print k.match('^/tag/\d\d?$') next if BUILTIN_CONFIGURATIONS.include?(k) || k.match(/^condition\d\d?$/) || k.match(/^tag\d\d?$/) raise Fluent::ConfigError, 'out_condition_checker: some weird config is set {'+k.to_s+':'+v.to_s+'}' } #tagを読み込み tags = [] (1..PATTERN_MAX_NUM).each do |i| next unless conf["tag#{i}"] tags.push(conf["tag#{i}"]) # tags[i+1] で、欲しいtagにアクセスできる end #conditionを読み込み @conditions = [] (1..PATTERN_MAX_NUM).each do |i| next unless conf["condition#{i}"] @conditions.push(conf["condition#{i}"]) end if tags.size != @conditions.size raise Fluent::ConfigError, 'match the numbers of tags and conditions; number of tags: '+tags.size.to_s+', number of conditions: '+@conditions.size.to_s end # maps[i] で、欲しいconditionに対応するrecordにアクセスすることができる。 # FIXME: conf.elements.forEachでいい感じに回せそう maps = [] (1..PATTERN_MAX_NUM).each do |i| next unless conf["condition#{i}"] # 対応するcondition{i}が定義されているものだけ読み込む conf.elements.select { |element| element.name == 'record'+i.to_s }.each { |element| recordTmp = {} element.each_pair { |k, v| element.has_key?(k) # to suppress unread configuration warning recordTmp.merge!({k => parse_value(v)}) # map_if_false[k] = parse_value(v) } maps[i] = recordTmp } end map_else ={} conf.elements.select { |element| element.name == 'record_else' }.each { |element| recordTmp = {} element.each_pair { |k, v| element.has_key?(k) # to suppress unread configuration warning recordTmp.merge!({k => parse_value(v)}) } map_else = recordTmp } if @remove_keys @remove_keys = @remove_keys.split(',') end if @keep_keys raise Fluent::ConfigError, 'out_condition_checker: `renew_record` must be true to use `keep_keys`' unless @renew_record @keep_keys = @keep_keys.split(',') end placeholder_expander_params = { :log => log, :auto_typecast => @auto_typecast, # It should always be true } @placeholder_expander = if @enable_ruby # require utilities which would be used in ruby placeholders require 'pathname' require 'uri' require 'cgi' RubyPlaceholderExpander.new(placeholder_expander_params) else p 'WARN!! Hey! You should enable ruby!!!' PlaceholderExpander.new(placeholder_expander_params) end @maps = @placeholder_expander.preprocess_map(maps) @tags = @placeholder_expander.preprocess_map(tags) @tag_else = @placeholder_expander.preprocess_map(conf['tag_else']) @map_else = @placeholder_expander.preprocess_map(map_else) @hostname = Socket.gethostname end def process(tag, es) tag_parts = tag.split('.') tag_prefix = tag_prefix(tag_parts) tag_suffix = tag_suffix(tag_parts) placeholder_values = { 'tag' => tag, 'tags' => tag_parts, # for old version compatibility 'tag_parts' => tag_parts, 'tag_prefix' => tag_prefix, 'tag_suffix' => tag_suffix, 'hostname' => @hostname, } es.each {|time, record| placeholder_values.merge!({ 'time' => @placeholder_expander.time_value(time), 'record' => record, }) # TODO: ここの処理よくないって evaluate result, idx = evaluate_condition(@conditions, placeholder_values) placeholder_values.merge!({ 'result' => result }) if idx new_tag, new_record = reform(@tags[idx], @maps[idx+1], record, placeholder_values) else # TODO: tag_elseは使えなくするoption作る" new_tag, new_record = reform(@tag_else, @map_else, record, placeholder_values) # return end if new_tag if @renew_time_key && new_record.has_key?(@renew_time_key) time = new_record[@renew_time_key].to_i end @remove_keys.each {|k| new_record.delete(k) } if @remove_keys router.emit(new_tag, time, new_record) end } rescue => e log.warn "condition_checker: #{e.class} #{e.message} #{e.backtrace.first}" end private def evaluate_condition(conditions, placeholders) conditions.each_with_index{ |condition, idx| result = expand_placeholders(condition, placeholders) if result then return [result, idx] end } [@map_else, nil] end def parse_value(value_str) if value_str.start_with?('{', '[') JSON.parse(value_str) else value_str end rescue => e log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", :error_class => e.class, :error => e.message value_str # emit as string end def reform(tag, map, record, placeholder_values) placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) new_tag = expand_placeholders(tag, placeholders) new_record = @renew_record ? {} : record.dup @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record new_record.merge!(create_record(map, placeholders)) unless map.nil? [new_tag, new_record] end def create_record(map, placeholders) new_record = {} map.each_pair do |k, v| new_key = @placeholder_expander.expand(k, placeholders, true) new_record.merge!({ new_key => convert_num(@placeholder_expander.expand(v, placeholders, true)) }) end new_record end def convert_num(value) # Booleanがチェック if value == "true" return true elsif value == "false" return false end if value.to_i.to_s == value.to_s return value.to_i else return value end end def expand_placeholders(value, placeholders) if value.is_a?(String) new_value = @placeholder_expander.expand(value, placeholders) elsif value.is_a?(Hash) new_value = {} value.each_pair do |k, v| new_key = @placeholder_expander.expand(k, placeholders, true) new_value[new_key] = expand_placeholders(v, placeholders) end elsif value.is_a?(Array) new_value = [] value.each_with_index do |v, i| new_value[i] = expand_placeholders(v, placeholders) end else new_value = value end new_value end def tag_prefix(tag_parts) return [] if tag_parts.empty? tag_prefix = [tag_parts.first] 1.upto(tag_parts.size-1).each do |i| tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}" end tag_prefix end def tag_suffix(tag_parts) return [] if tag_parts.empty? rev_tag_parts = tag_parts.reverse rev_tag_suffix = [rev_tag_parts.first] 1.upto(tag_parts.size-1).each do |i| rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}" end rev_tag_suffix.reverse! end # THIS CLASS MUST BE THREAD-SAFE class RubyPlaceholderExpander attr_reader :log def initialize(params) @log = params[:log] @auto_typecast = params[:auto_typecast] @cleanroom_expander = CleanroomExpander.new end def time_value(time) Time.at(time) end # Preprocess record map to convert into ruby string expansion # # @param [Hash|String|Array] value record map config # @param [Boolean] force_stringify the value must be string, used for hash key def preprocess_map(value, force_stringify = false) new_value = nil if value.is_a?(String) if @auto_typecast and !force_stringify num_placeholders = value.scan('${').size if num_placeholders == 1 and value.start_with?('${') && value.end_with?('}') new_value = value[2..-2] # ${..} => .. end end unless new_value new_value = "%Q[#{value.gsub('${', '#{')}]" # xx${..}xx => %Q[xx#{..}xx] end elsif value.is_a?(Hash) new_value = {} value.each_pair do |k, v| new_value[preprocess_map(k, true)] = preprocess_map(v) end elsif value.is_a?(Array) new_value = [] value.each_with_index do |v, i| new_value[i] = preprocess_map(v) end else new_value = value end new_value end # FIXME: 引数返すだけの関数があるので削除 def prepare_placeholders(placeholder_values) placeholder_values end # Expand string with placeholders # # @param [String] str def expand(str, placeholders, force_stringify = false) # FIXME: tag情報は使用してなさそうなので, 不必要であれば @cleanroom_expander.expand( str, placeholders['tag'], placeholders['time'], placeholders['record'], placeholders['tag_parts'], placeholders['tag_prefix'], placeholders['tag_suffix'], placeholders['hostname'] ) rescue => e log.warn "record_reformer: failed to expand `#{str}`", :error_class => e.class, :error => e.message log.warn_backtrace nil end class CleanroomExpander def expand(__str_to_eval__, tag, time, record, tag_parts, tag_prefix, tag_suffix, hostname, force_stringify = true) Thread.current[:record_reformer_record] = record # for old version compatibility instance_eval(__str_to_eval__) end # for old version compatibility def method_missing(name) key = name.to_s record = Thread.current[:record_reformer_record] if record.has_key?(key) record[key] else raise NameError, "undefined local variable or method `#{key}'" end end (Object.instance_methods).each do |m| undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/ end end end end end