# # Copyright 2016 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'fluent/plugin/exception_detector' require 'fluent/output' module Fluent # This output plugin consumes a log stream of JSON objects which contain # single-line log messages. If a consecutive sequence of log messages form # an exception stack trace, they forwarded as a single, combined JSON # object. Otherwise, the input log data is forwarded as is. class DetectExceptionsOutput < Output desc 'The field which contains the raw message text in the input json data.' config_param :message, :string, default: '' desc 'The prefix to be removed from the input tag when outputting a record.' config_param :remove_tag_prefix, :string, default: '' desc 'The interval of flushing the buffer for multiline format.' config_param :multiline_flush_interval, :time, default: nil desc 'Programming languages for which to detect exceptions. Default: all.' config_param :languages, :array, value_type: :string, default: [] desc 'Maximum number of lines to flush (0 means no limit). Default: 1000.' config_param :max_lines, :integer, default: 1000 desc 'Maximum number of bytes to flush (0 means no limit). Default: 0.' config_param :max_bytes, :integer, default: 0 Fluent::Plugin.register_output('detect_exceptions', self) def configure(conf) super if multiline_flush_interval @check_flush_interval = [multiline_flush_interval * 0.1, 1].max end @languages = languages.map(&:to_sym) # Maps log stream tags to a corresponding TraceAccumulator. @accumulators = {} end def start super if multiline_flush_interval @flush_buffer_mutex = Mutex.new @stop_check = false @thread = Thread.new(&method(:check_flush_loop)) end end def before_shutdown flush_buffers super if defined?(super) end def shutdown # Before shutdown is not available in older fluentd versions. # Hence, we make sure that we flush the buffers here as well. flush_buffers @thread.join if multiline_flush_interval super end def emit(tag, es, chain) es.each do |time_sec, record| process_record(tag, time_sec, record) end chain.next end private def process_record(tag, time_sec, record) synchronize do unless @accumulators.key?(tag) out_tag = tag.sub(/^#{Regexp.escape(remove_tag_prefix)}\./, '') @accumulators[tag] = Fluent::TraceAccumulator.new(message, @languages, max_lines: max_lines, max_bytes: max_bytes) do |t, r| router.emit(out_tag, t, r) end end @accumulators[tag].push(time_sec, record) end end def flush_buffers synchronize do @stop_check = true @accumulators.each_value(&:force_flush) end end def check_flush_loop @flush_buffer_mutex.synchronize do loop do @flush_buffer_mutex.sleep(@check_flush_interval) now = Time.now break if @stop_check @accumulators.each_value do |acc| needs_flush = now - acc.buffer_start_time > multiline_flush_interval acc.force_flush if needs_flush end end end rescue log.error 'error in check_flush_loop', error: $ERROR_INFO.to_s log.error_backtrace end def synchronize(&block) if multiline_flush_interval @flush_buffer_mutex.synchronize(&block) else yield end end end end