# # Copyright 2020- LIN LI # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'fluent/plugin/filter' require 'json' require 'sentry-ruby' require 'audit_log_parser' module Fluent module Plugin class LightCoreFilter < Fluent::Plugin::Filter Fluent::Plugin.register_filter('light_core', self) # 通知相关配置 config_param :sentry, :bool, default: false config_param :sentry_dsn, :string, default: '' # Application 异常 settings config_param :app_stream, :string, default: 'stderr' config_param :app_message, :array, default: [], value_type: :regexp config_param :app_status, :array, default: ['500'], value_type: :string config_param :app_elapsed, :float, default: 2000 # Nginx 异常 settings config_param :lb_stream, :string, default: 'stderr' config_param :lb_code, :array, default: ['400', '500'], value_type: :string config_param :lb_elapsed, :float, default: 3 config_param :lb_ignore, :array, default: [], value_type: :string # MongoDB 异常 settings config_param :mongo_severity, :array, default: ['F', 'E'], value_type: :string config_param :mongo_querytime, :float, default: 100 config_param :mongo_dataBytes, :float, default: 1073741824 config_param :mongo_reslen, :float, default: 20971520 # 初始化 Sentry def start super if @sentry log.info('init sentry') Sentry.init do |config| config.dsn = @sentry_dsn # To activate performance monitoring, set one of these options. # We recommend adjusting the value in production: config.traces_sample_rate = 1 # config.background_worker_threads = 2 config.transport.timeout = 10 config.transport.open_timeout = 10 end end end # 清理 def shutdown super end # 主处理 def filter(tag, time, record) # 应用 if ['app', 'service'].include? tag record = filter_app(tag, time, record) return unless record notice('app', record) end # 负载均衡 if ['lb'].include? tag record = filter_lb(tag, time, record) return unless record notice('lb', record) end # 数据库 if ['master', 'secondary', 'arbiter', 'third'].include? tag record = filter_mongo(tag, time, record) return unless record notice('mongo', record) end if ['syslog.messages', 'syslog.secure', 'syslog.audit'].include? tag record = filter_syslog(tag, time, record) return unless record end record['environment'] = ENV['FLUENTD_ENV'] record['node'] = ENV['NODE_IP'] # 其他 record end # Parse syslog def filter_syslog(tag, time, record) if (tag == 'syslog.audit') line = record['message'] return record unless line record = AuditLogParser.parse_line(line, flatten: false) record['time'] = Time.at(record["header"]["msg"][/[0-9]+/].to_i).to_s return record end record['time'] = Time.at(time).to_s return record end # Parse the application log def filter_app(tag, time, record) file = record['file'].split('/').last.split('_') # Parse log file name log = record['log'] # Get detailed log content # Set common items record['cid'] = file[0] # container id record['cname'] = file[0].split('-')[1] # container name record['ctime'] = record['time'] # container time # Delete useless content record.delete('log') record.delete('file') record.delete('time') # Standard AP log [2020-07-06T09:21:51.121] [A] [INFO] xxxxx if /^\[\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{3}\] \[A\]/.match(log) item = /^\[(?