# frozen_string_literal: true # # Copyright 2020- WallyNegima # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'fluent/plugin/output' module Fluent module Plugin # fluentd records merger class RecordsMergerOutput < Fluent::Plugin::Output Fluent::Plugin.register_output('records_merger', self) helpers :event_emitter PATTERN_MAX_NUM = 20 config_param :tag, :string, default: 'merged', desc: 'Specify the output tag' config_param :main_tag, :string, desc: 'Specify main_tag name' (1..PATTERN_MAX_NUM).each do |i| config_param ('sub_tag' + i.to_s).to_sym, :string, default: nil, # NAME REGEXP desc: 'Specify sub_tag name' end config_param :auto_typecast, :bool, default: true, # false for lower version compatibility desc: 'Automatically cast the field types.' config_param :merge_timing, :enum, list: %i[before after simple], default: 'simple', desc: 'Choose merge timing before/after' # ↓ これ、なんかエラー出てるっぽい。修正する。 config_param :tolerable_time_range, :integer, default: nil, desc: 'Specify tolerable time range for merging' config_param :condition, :string, default: nil, desc: 'emit if condition is true ' config_param :keep_records, :bool, default: false, desc: 'keep latest records if true' # 以下、未実装 config_param :force_emit, :bool, default: false, desc: 'Specify to emit or not when exceeded the tolerable_time_range' def configure(conf) super # conf: {"@type"=>"records_merger", "tag"=>"merged1", "main_tag"=>"accesslog.1.main", "sub_tag1"=>"accesslog.1.sub1", "sub_tag2"=>"accesslog.1.sub2"}, [] @tag = conf['tag'] # これも,結局 ${tag} とかを有効にする関係で使わなくなりそう. # set the tags set by the user @last_records = {} @main_tag = conf['main_tag'] @sub_tags = [] (1..PATTERN_MAX_NUM).each do |i| next unless conf["sub_tag#{i}"] @sub_tags.push(conf["sub_tag#{i}"]) end if conf['merge_timing'] @merge_timing = conf['merge_timing'] end @tolerable_time_range = conf['tolerable_time_range'].to_i @flags4merge = Array.new(@sub_tags.size + 1, 0) @force_emit = conf['force_emit'] # from out-record-reformer map = {} conf.elements.select { |element| element.name == 'record' }.each do |element| element.each_pair do |k, v| element.key?(k) # to suppress unread configuration warning map[k] = parse_value(v) end end placeholder_expander_params = { log: log, auto_typecast: @auto_typecast } @placeholder_expander = PlaceholderExpander.new(placeholder_expander_params) @map = @placeholder_expander.preprocess_map(map) @tag = @placeholder_expander.preprocess_map(@tag) @hostname = Socket.gethostname @condition = conf['condition'] @keep_records = conf['keep_records'] end def start super end def shutdown super end def process(tag, es) if @merge_timing == 'before' if @main_tag == tag store_to_lastrecords(tag, es) @flags4merge[0] = 1 check_timegap if @tolerable_time_range > 0 emit_new_event(es) if @flags4merge.all? { |flag| flag == 1 } elsif @sub_tags.include?(tag) store_to_lastrecords(tag, es) @flags4merge[@sub_tags.index(tag) + 1] = 1 check_timegap if @tolerable_time_range > 0 end elsif @merge_timing == 'after' if @main_tag == tag store_to_lastrecords(tag, es) @flags4merge[0] = 1 check_timegap if @tolerable_time_range > 0 elsif @sub_tags.include?(tag) if @flags4merge[0] == 1 store_to_lastrecords(tag, es) @flags4merge[@sub_tags.index(tag) + 1] = 1 check_timegap if @tolerable_time_range > 0 emit_new_event(es) if @flags4merge.all? {|flag| flag == 1} end else p 'something else has come! check it!' + tag.to_s end elsif @merge_timing == 'simple' if @main_tag == tag store_to_lastrecords(tag, es) @flags4merge[0] = 1 check_timegap if @tolerable_time_range > 0 emit_new_event(es) if @flags4merge.all? { |flag| flag == 1 } elsif @sub_tags.include?(tag) store_to_lastrecords(tag, es) @flags4merge[@sub_tags.index(tag) + 1] = 1 check_timegap if @tolerable_time_range > 0 emit_new_event(es) if @flags4merge.all? { |flag| flag == 1 } else p 'something else has come! check it!' + tag.to_s end end rescue StandardError => e log.warn "record_reformer: #{e.class} #{e.message} #{e.backtrace.first}" log.debug "record_reformer: tag:#{@tag} map:#{@map} record:#{@last_records} placeholder_values" end # そのまま持ってきてしまっているので,rewriteしたほうが良さそう? private def store_to_lastrecords(tag, es) es.each do |time, record| record.store('time', time) @last_records.store(tag, record) # これで,last_recordsにはsub_tagの情報がそのまま入ってる.ちなみに,storeはkey, valueの形でハッシュに値を入れるために使う.´ end end def generate_placeholders # here, tag can be used directly only because it is the main tag main_tag_parts = @main_tag.split('.') main_tag_prefix = tag_prefix(main_tag_parts) main_tag_suffix = tag_suffix(main_tag_parts) placeholder_values = { 'main' => @last_records[@main_tag], 'main_tag' => @main_tag, 'main_tags' => main_tag_parts, 'main_tag_parts' => main_tag_parts, 'main_tag_prefix' => main_tag_prefix, 'main_tag_suffix' => main_tag_suffix, 'hostname' => @hostname } # SubTagに関する内容を追加 (1..PATTERN_MAX_NUM).each do |i| next unless @last_records.key?(@sub_tags[i - 1]) # p @sub_tags[i-1] sub_tag_parts = @sub_tags[i - 1].split('.') sub_tag_prefix = tag_prefix(sub_tag_parts) sub_tag_suffix = tag_suffix(sub_tag_parts) placeholder_values.merge!( "sub#{i}" => @last_records[@sub_tags[i - 1]], "sub#{i}_tag" => @sub_tags[i - 1], "sub#{i}_tags" => sub_tag_parts, "sub#{i}_tag_parts" => sub_tag_parts, "sub#{i}_tag_prefix" => sub_tag_prefix, "sub#{i}_tag_suffix" => sub_tag_suffix ) end placeholder_values end def emit_new_event(es) es.each do |time, _record| placeholder_values = generate_placeholders message = @last_records next if message.keys.empty? new_tag, new_record = reform(@tag, @last_records, placeholder_values) next unless new_tag unless @condition.nil? # conditionがOKじゃなければemiしない next unless condition_is_ok?(placeholder_values) end router.emit(new_tag, time, new_record) # flagの初期化 unless @keep_records (0..@flags4merge.length - 1).each do |i| @flags4merge[i] = 0 end end end end def condition_is_ok?(placeholder_values) placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) instance_eval(@condition.gsub(/((main\["\w+"\])|(sub[0-9]{1,2}\["\w+"\]))/, 'placeholders[\'${\1}\']')) end def check_timegap tag_time_hash = {} if @last_records.size > 1 @last_records.each do |k, v| tag_time_hash.store(k, v['time'].sec) end # ここでtag_nameと一緒に保持しておきたい latest = tag_time_hash.max { |a, b| a[1] <=> b[1] } oldest = tag_time_hash.min { |a, b| a[1] <=> b[1] } if latest[1] - oldest[1] > @tolerable_time_range @last_records.delete(oldest[0]) if oldest[0] == @main_tag @flags4merge[0] = 0 else @flags4merge[@sub_tags.index(oldest[0]) + 1] = 0 end end end end def parse_value(value_str) if value_str.start_with?('{', '[') JSON.parse(value_str) else value_str end rescue StandardError => e log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", error_class: e.class, error: e.message value_str # emit as string end def reform(tag, _record, placeholder_values) placeholders = @placeholder_expander.prepare_placeholders(placeholder_values) new_tag = expand_placeholders(tag, placeholders) # 新しいtagを生成してる # 基本的に,全部ゼロから作るからここでdupする必要はない. # new_record = @renew_record ? {} : record.dup # dupは新しいのを作ってコピーするやつ # @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record new_record = {} new_record.merge!(expand_placeholders(@map, placeholders)) [new_tag, new_record] end def expand_placeholders(value, placeholders) if value.is_a?(String) new_value = @placeholder_expander.expand(value, placeholders) elsif value.is_a?(Hash) # ここがを処理してる部分 new_value = {} # ここで分割して,再帰的に実行している感じ. value.each_pair do |k, v| new_key = @placeholder_expander.expand(k, placeholders, true) new_value[new_key] = expand_placeholders(v, placeholders) end elsif value.is_a?(Array) # これはvalueがArrayで与えられてたら連結するために入ってる. new_value = [] value.each_with_index do |v, i| new_value[i] = expand_placeholders(v, placeholders) end else new_value = value end new_value end def tag_prefix(tag_parts) return [] if tag_parts.empty? tag_prefix = [tag_parts.first] 1.upto(tag_parts.size - 1).each do |i| tag_prefix[i] = "#{tag_prefix[i - 1]}.#{tag_parts[i]}" end tag_prefix end def tag_suffix(tag_parts) return [] if tag_parts.empty? rev_tag_parts = tag_parts.reverse rev_tag_suffix = [rev_tag_parts.first] 1.upto(tag_parts.size - 1).each do |i| rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i - 1]}" end rev_tag_suffix.reverse! end # THIS CLASS MUST BE THREAD-SAFE # これを入れることで,${tag}とか使えるようになる. class PlaceholderExpander attr_reader :placeholders, :log def initialize(params) @log = params[:log] @auto_typecast = params[:auto_typecast] end def time_value(time) Time.at(time).to_s end def preprocess_map(value, _force_stringify = false) value end def prepare_placeholders(placeholder_values) placeholders = {} placeholder_values.each do |key, value| if value.is_a?(Array) # tag_parts, etc size = value.size value.each_with_index do |v, idx| placeholders.store("${#{key}[#{idx}]}", v) placeholders.store("${#{key}[#{idx - size}]}", v) # support [-1] end elsif value.is_a?(Hash) # record, etc value.each do |k, v| placeholders.store("${#{k}}", v) unless placeholder_values.key?(k) # prevent overwriting the reserved keys such as tag placeholders.store(%(${#{key}["#{k}"]}), v) # record["foo"] end else # string, interger, float, and others? placeholders.store("${#{key}}", value) end end placeholders end # Expand string with placeholders # # @param [String] str # @param [Boolean] force_stringify the value must be string, used for hash key def expand(str, placeholders, force_stringify = false) if @auto_typecast && !force_stringify single_placeholder_matched = str.match(/\A(\${[^}]+}|__[A-Z_]+__)\z/) if single_placeholder_matched log_if_unknown_placeholder(Regexp.last_match(1), placeholders) return placeholders[single_placeholder_matched[1]] end end str.gsub(/(\${[^}]+}|__[A-Z_]+__)/) do log_if_unknown_placeholder(Regexp.last_match(1), placeholders) placeholders[Regexp.last_match(1)] end end private def log_if_unknown_placeholder(placeholder, placeholders) log.warn "record_reformer: unknown placeholder `#{placeholder}` found" unless placeholders.include?(placeholder) end end end end end