# frozen_string_literal: true
#
# Copyright 2020- WallyNegima
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'fluent/plugin/output'
module Fluent
module Plugin
class RecordsMergerOutput < Fluent::Plugin::Output
Fluent::Plugin.register_output('records_merger', self)
helpers :event_emitter
PATTERN_MAX_NUM = 20
config_param :tag, :string, default: 'merged',
desc: 'Specify the output tag'
config_param :main_tag, :string,
desc: 'Specify main_tag name'
config_param :sub_tag1, :string, # string: NAME REGEXP
desc: 'Specify sub_tag name'
(2..PATTERN_MAX_NUM).each do |i|
config_param ('sub_tag' + i.to_s).to_sym, :string, default: nil, # NAME REGEXP
desc: 'Specify sub_tag name'
end
config_param :auto_typecast, :bool, default: true, # false for lower version compatibility
desc: 'Automatically cast the field types.'
config_param :merge_timing, :enum, list: %i[before after simple], default: 'before',
desc: 'Choose merge timing before/after'
# ↓ これ、なんかエラー出てるっぽい。修正する。
config_param :tolerable_time_range, :integer, default: nil,
desc: 'Specify tolerable time range for merging'
# 以下、未実装
config_param :force_emit, :bool, default: false,
desc: 'Specify to emit or not when exceeded the tolerable_time_range'
def configure(conf)
super
# p 'in configure()'
# p conf
# conf: {"@type"=>"records_merger", "tag"=>"merged1", "main_tag"=>"accesslog.1.main", "sub_tag1"=>"accesslog.1.sub1", "sub_tag2"=>"accesslog.1.sub2"}, []
@tag = conf['tag'] # これも,結局 ${tag} とかを有効にする関係で使わなくなりそう.
# set the tags set by the user
@main_tag = conf['main_tag']
@sub_tags = []
(1..PATTERN_MAX_NUM).each do |i|
next unless conf["sub_tag#{i}"]
@sub_tags.push(conf["sub_tag#{i}"])
end
if conf['merge_timing']
@merge_timing = conf['merge_timing']
p '@merge_timing is ' + @merge_timing.to_s
else
p 'Specify merge_timing before/after'
end
@tolerable_time_range = conf['tolerable_time_range'].to_i
@last_records = {} # main を流した後に削除する? (未実装)
p 'size of @sub_tags is ' + @sub_tags.size.to_s
@flags4merge = Array.new(@sub_tags.size + 1, 0)
# p @flags4merge # [0, 0,..., 0]
@force_emit = conf['force_emit']
# from out-record-reformer
map = {}
# load directive
conf.elements.select { |element| element.name == 'record' }.each do |element|
element.each_pair do |k, v|
# p k
# p v
element.key?(k) # to suppress unread configuration warning
map[k] = parse_value(v)
end
end
# p 'print map below'
# p map
placeholder_expander_params = {
log: log,
auto_typecast: @auto_typecast
}
@placeholder_expander = PlaceholderExpander.new(placeholder_expander_params)
@map = @placeholder_expander.preprocess_map(map)
@tag = @placeholder_expander.preprocess_map(@tag)
@hostname = Socket.gethostname
end
def start
super
end
def shutdown
super
end
def process(tag, es)
# p 'in process(), merge_timing is '+@merge_timing.to_s
if @merge_timing == 'before'
# p 'merge timing is before'
if @main_tag == tag
# p 'main tag has come'
store_to_lastrecords(tag, es)
@flags4merge[0] = 1
# p 'store_to_lastrecords() seems to have no problems'
check_timegap if @tolerable_time_range > 0
# pp @flags4merge
if @flags4merge.all? { |flag| flag == 1 }
# pp 'Great! Now you can omit merged event!'
emit_new_event(es)
else
(0..@flags4merge.length - 1).each do |i|
@flags4merge[i] = 0
end
end
elsif @sub_tags.include?(tag)
store_to_lastrecords(tag, es)
@flags4merge[@sub_tags.index(tag) + 1] = 1
check_timegap if @tolerable_time_range > 0
# p @flags4merge
end
elsif @merge_timing == 'after'
# p 'merge timing is after'
if @main_tag == tag
store_to_lastrecords(tag, es)
@flags4merge[0] = 1
check_timegap if @tolerable_time_range > 0
# pp @flags4merge
elsif @sub_tags.include?(tag)
if @flags4merge[0] == 1
store_to_lastrecords(tag, es)
@flags4merge[@sub_tags.index(tag) + 1] = 1
check_timegap if @tolerable_time_range > 0
# p @flags4merge
if @flags4merge.all? { |flag| flag == 1 }
# pp 'Great! Now you can omit merged event!'
emit_new_event(es)
end
else
(0..@flags4merge.length - 1).each do |i|
@flags4merge[i] = 0
end
# p @flags4merge
end
else
p 'something else has come! check it!' + tag.to_s
end
elsif @merge_timing == 'simple'
# p 'merge timing is simple'
if @main_tag == tag
# p "main_tag_came"
store_to_lastrecords(tag, es)
# p 'after store_to_lastrecords()'
@flags4merge[0] = 1
check_timegap if @tolerable_time_range > 0
# pp @flags4merge
if @flags4merge.all? { |flag| flag == 1 }
# p 'before emit_new event()'
emit_new_event(es)
# p 'after emit_new event()'
end
elsif @sub_tags.include?(tag)
store_to_lastrecords(tag, es)
@flags4merge[@sub_tags.index(tag) + 1] = 1
# p @flags4merge
check_timegap if @tolerable_time_range > 0
emit_new_event(es) if @flags4merge.all? { |flag| flag == 1 }
else
p 'something else has come! check it!' + tag.to_s
end
end
rescue StandardError => e
log.warn "record_reformer: #{e.class} #{e.message} #{e.backtrace.first}"
log.debug "record_reformer: tag:#{@tag} map:#{@map} record:#{last_records} placeholder_values:#{placeholder_values}"
end
# そのまま持ってきてしまっているので,rewriteしたほうが良さそう?
private
def store_to_lastrecords(tag, es)
es.each do |time, record|
# p 'in store_to_lastrecords()'
# print 'time: '
# p time
# print 'record: '
# p record
record.store('time', time)
@last_records.store(tag, record)
# これで,last_recordsにはsub_tagの情報がそのまま入ってる.ちなみに,storeはkey, valueの形でハッシュに値を入れるために使う.
# ここはそのままでも問題なくて,mainが来たときにまとめて調整すれば良い.
end
end
def generate_placeholders
# here, tag can be used directly only because it is the main tag
main_tag_parts = @main_tag.split('.')
# p tag_parts
main_tag_prefix = tag_prefix(main_tag_parts)
# p tag_prefix
main_tag_suffix = tag_suffix(main_tag_parts)
# p tag_suffix
placeholder_values = {
'main' => @last_records[@main_tag],
'main_tag' => @main_tag,
'main_tags' => main_tag_parts,
'main_tag_parts' => main_tag_parts,
'main_tag_prefix' => main_tag_prefix,
'main_tag_suffix' => main_tag_suffix,
'hostname' => @hostname
}
# SubTagに関する内容を追加
(1..PATTERN_MAX_NUM).each do |i|
next unless @last_records.key?(@sub_tags[i - 1])
# p @sub_tags[i-1]
sub_tag_parts = @sub_tags[i - 1].split('.')
sub_tag_prefix = tag_prefix(sub_tag_parts)
sub_tag_suffix = tag_suffix(sub_tag_parts)
placeholder_values.merge!(
"sub#{i}" => @last_records[@sub_tags[i - 1]],
"sub#{i}_tag" => @sub_tags[i - 1],
"sub#{i}_tags" => sub_tag_parts,
"sub#{i}_tag_parts" => sub_tag_parts,
"sub#{i}_tag_prefix" => sub_tag_prefix,
"sub#{i}_tag_suffix" => sub_tag_suffix
)
end
placeholder_values
end
def emit_new_event(es)
# p "in emit_new_event"
es.each do |time, _record|
placeholder_values = generate_placeholders
message = @last_records
# print "message:"
# p @last_records
next if message.keys.empty?
new_tag, new_record = reform(@tag, @last_records, placeholder_values)
next unless new_tag
router.emit(new_tag, time, new_record)
# flagの初期化
(0..@flags4merge.length - 1).each do |i|
@flags4merge[i] = 0
end
# p @flags4merge
# router.emit(@tag, time, message)
end
end
def check_timegap
p @last_records.size
tag_time_hash = {}
if @last_records.size > 1
@last_records.each do |k, v|
# p k
# p v['time'].sec
# p v['time'].nsec # ナノセカンドでの比較は余裕があったら実装する
tag_time_hash.store(k, v['time'].sec)
end
# ここでtag_nameと一緒に保持しておきたい
latest = tag_time_hash.max { |a, b| a[1] <=> b[1] }
# p latest
oldest = tag_time_hash.min { |a, b| a[1] <=> b[1] }
# p oldest
# p @last_records
if latest[1] - oldest[1] > @tolerable_time_range
p oldest[0].to_s + ' is too old! so deleted'
@last_records.delete(oldest[0])
if oldest[0] == @main_tag
@flags4merge[0] = 0
else
@flags4merge[@sub_tags.index(oldest[0]) + 1] = 0
end
end
end
end
def parse_value(value_str)
if value_str.start_with?('{', '[')
JSON.parse(value_str)
else
value_str
end
rescue StandardError => e
log.warn "failed to parse #{value_str} as json. Assuming #{value_str} is a string", error_class: e.class, error: e.message
value_str # emit as string
end
def reform(tag, _record, placeholder_values)
# p 'in reform()'
# p 'tag: '
# pp tag
# p 'record'
# pp record
placeholders = @placeholder_expander.prepare_placeholders(placeholder_values)
# p 'placeholders'
# pp placeholders
new_tag = expand_placeholders(tag, placeholders) # 新しいtagを生成してる
# p 'new_tag'
# p new_tag
# 基本的に,全部ゼロから作るからここでdupする必要はない.
# new_record = @renew_record ? {} : record.dup # dupは新しいのを作ってコピーするやつ
# @keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record
new_record = {}
# p '@map' # ここには,で指定したものが入ってる.
# pp @map
new_record.merge!(expand_placeholders(@map, placeholders))
# p 'new_record'
# pp new_record
[new_tag, new_record]
end
def expand_placeholders(value, placeholders)
# p 'in expand_placeholders()'
# pp value
# pp placeholders
if value.is_a?(String)
new_value = @placeholder_expander.expand(value, placeholders)
elsif value.is_a?(Hash)
# ここがを処理してる部分
new_value = {}
# ここで分割して,再帰的に実行している感じ.
value.each_pair do |k, v|
new_key = @placeholder_expander.expand(k, placeholders, true)
new_value[new_key] = expand_placeholders(v, placeholders)
end
elsif value.is_a?(Array)
# これはvalueがArrayで与えられてたら連結するために入ってる.
new_value = []
value.each_with_index do |v, i|
new_value[i] = expand_placeholders(v, placeholders)
end
else
new_value = value
end
new_value
end
def tag_prefix(tag_parts)
return [] if tag_parts.empty?
tag_prefix = [tag_parts.first]
1.upto(tag_parts.size - 1).each do |i|
tag_prefix[i] = "#{tag_prefix[i - 1]}.#{tag_parts[i]}"
end
tag_prefix
end
def tag_suffix(tag_parts)
return [] if tag_parts.empty?
rev_tag_parts = tag_parts.reverse
rev_tag_suffix = [rev_tag_parts.first]
1.upto(tag_parts.size - 1).each do |i|
rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i - 1]}"
end
rev_tag_suffix.reverse!
end
# THIS CLASS MUST BE THREAD-SAFE
# これを入れることで,${tag}とか使えるようになる.
class PlaceholderExpander
attr_reader :placeholders, :log
def initialize(params)
@log = params[:log]
@auto_typecast = params[:auto_typecast]
end
def time_value(time)
Time.at(time).to_s
end
def preprocess_map(value, _force_stringify = false)
value
end
def prepare_placeholders(placeholder_values)
# p 'in prepare_placeholders()'
placeholders = {}
placeholder_values.each do |key, value|
# p key
if value.is_a?(Array) # tag_parts, etc
size = value.size
value.each_with_index do |v, idx|
placeholders.store("${#{key}[#{idx}]}", v)
placeholders.store("${#{key}[#{idx - size}]}", v) # support [-1]
end
elsif value.is_a?(Hash) # record, etc
value.each do |k, v|
unless placeholder_values.key?(k) # prevent overwriting the reserved keys such as tag
placeholders.store("${#{k}}", v)
end
placeholders.store(%(${#{key}["#{k}"]}), v) # record["foo"]
end
else # string, interger, float, and others?
placeholders.store("${#{key}}", value)
end
end
# pp placeholders
# p 'end of prepare_placeholders()'
placeholders
end
# Expand string with placeholders
#
# @param [String] str
# @param [Boolean] force_stringify the value must be string, used for hash key
def expand(str, placeholders, force_stringify = false)
if @auto_typecast && !force_stringify
single_placeholder_matched = str.match(/\A(\${[^}]+}|__[A-Z_]+__)\z/)
if single_placeholder_matched
log_if_unknown_placeholder(Regexp.last_match(1), placeholders)
return placeholders[single_placeholder_matched[1]]
end
end
str.gsub(/(\${[^}]+}|__[A-Z_]+__)/) do
log_if_unknown_placeholder(Regexp.last_match(1), placeholders)
placeholders[Regexp.last_match(1)]
end
end
private
def log_if_unknown_placeholder(placeholder, placeholders)
unless placeholders.include?(placeholder)
log.warn "record_reformer: unknown placeholder `#{placeholder}` found"
end
end
end
end
end
end