lib/fluent/plugin/out_detect_exceptions.rb in fluent-plugin-detect-exceptions-0.0.14 vs lib/fluent/plugin/out_detect_exceptions.rb in fluent-plugin-detect-exceptions-0.0.15
- old
+ new
@@ -42,28 +42,26 @@
Fluent::Plugin.register_output('detect_exceptions', self)
def configure(conf)
super
- if multiline_flush_interval
- @check_flush_interval = [multiline_flush_interval * 0.1, 1].max
- end
+ @check_flush_interval = [multiline_flush_interval * 0.1, 1].max if multiline_flush_interval
@languages = languages.map(&:to_sym)
# Maps log stream tags to a corresponding TraceAccumulator.
@accumulators = {}
end
def start
super
- if multiline_flush_interval
- @flush_buffer_mutex = Mutex.new
- @stop_check = false
- @thread = Thread.new(&method(:check_flush_loop))
- end
+ return unless multiline_flush_interval
+
+ @flush_buffer_mutex = Mutex.new
+ @stop_check = false
+ @thread = Thread.new(&method(:check_flush_loop))
end
def before_shutdown
flush_buffers
super if defined?(super)
@@ -75,12 +73,12 @@
flush_buffers
@thread.join if @multiline_flush_interval
super
end
- def emit(tag, es, chain)
- es.each do |time_sec, record|
+ def emit(tag, entries, chain)
+ entries.each do |time_sec, record|
process_record(tag, time_sec, record)
end
chain.next
end
@@ -119,16 +117,17 @@
@flush_buffer_mutex.synchronize do
loop do
@flush_buffer_mutex.sleep(@check_flush_interval)
now = Time.now
break if @stop_check
+
@accumulators.each_value do |acc|
acc.force_flush if now - acc.buffer_start_time >
@multiline_flush_interval
end
end
end
- rescue
+ rescue StandardError
log.error 'error in check_flush_loop', error: $ERROR_INFO.to_s
log.error_backtrace
end
def synchronize(&block)