lib/fluent/plugin/out_detect_exceptions.rb in fluent-plugin-detect-exceptions-0.0.14 vs lib/fluent/plugin/out_detect_exceptions.rb in fluent-plugin-detect-exceptions-0.0.15

- old
+ new

@@ -42,28 +42,26 @@ Fluent::Plugin.register_output('detect_exceptions', self) def configure(conf) super - if multiline_flush_interval - @check_flush_interval = [multiline_flush_interval * 0.1, 1].max - end + @check_flush_interval = [multiline_flush_interval * 0.1, 1].max if multiline_flush_interval @languages = languages.map(&:to_sym) # Maps log stream tags to a corresponding TraceAccumulator. @accumulators = {} end def start super - if multiline_flush_interval - @flush_buffer_mutex = Mutex.new - @stop_check = false - @thread = Thread.new(&method(:check_flush_loop)) - end + return unless multiline_flush_interval + + @flush_buffer_mutex = Mutex.new + @stop_check = false + @thread = Thread.new(&method(:check_flush_loop)) end def before_shutdown flush_buffers super if defined?(super) @@ -75,12 +73,12 @@ flush_buffers @thread.join if @multiline_flush_interval super end - def emit(tag, es, chain) - es.each do |time_sec, record| + def emit(tag, entries, chain) + entries.each do |time_sec, record| process_record(tag, time_sec, record) end chain.next end @@ -119,16 +117,17 @@ @flush_buffer_mutex.synchronize do loop do @flush_buffer_mutex.sleep(@check_flush_interval) now = Time.now break if @stop_check + @accumulators.each_value do |acc| acc.force_flush if now - acc.buffer_start_time > @multiline_flush_interval end end end - rescue + rescue StandardError log.error 'error in check_flush_loop', error: $ERROR_INFO.to_s log.error_backtrace end def synchronize(&block)