lib/logstash/outputs/file.rb in logstash-output-file-4.2.4 vs lib/logstash/outputs/file.rb in logstash-output-file-4.2.5

- old
+ new

@@ -92,15 +92,16 @@ else @file_root = File.dirname(path) end @failure_path = File.join(@file_root, @filename_failure) - - now = Time.now - @last_flush_cycle = now - @last_stale_cleanup_cycle = now @flush_interval = @flush_interval.to_i + if @flush_interval > 0 + @flusher = Interval.start(@flush_interval, -> { flush_pending_files }) + end + + @last_stale_cleanup_cycle = Time.now @stale_cleanup_interval = 10 end # def register private def validate_path @@ -139,19 +140,20 @@ fd.write(chunks.last) else # append to the file chunks.each {|chunk| fd.write(chunk) } end - flush(fd) + fd.flush unless @flusher && @flusher.alive? end close_stale_files end end # def receive public def close + @flusher.stop unless @flusher.nil? @io_mutex.synchronize do @logger.debug("Close: closing files") @files.each do |path, fd| begin @@ -198,31 +200,24 @@ def extract_file_root parts = File.expand_path(path).split(File::SEPARATOR) parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR) end + # the back-bone of @flusher, our periodic-flushing interval. private - def flush(fd) - if flush_interval > 0 - flush_pending_files - else - fd.flush - end - end - - # every flush_interval seconds or so (triggered by events, but if there are no events there's no point flushing files anyway) - private def flush_pending_files - return unless Time.now - @last_flush_cycle >= flush_interval - @logger.debug("Starting flush cycle") + @io_mutex.synchronize do + @logger.debug("Starting flush cycle") - @files.each do |path, fd| - @logger.debug("Flushing file", :path => path, :fd => fd) - fd.flush + @files.each do |path, fd| + @logger.debug("Flushing file", :path => path, :fd => fd) + fd.flush + end end - - @last_flush_cycle = Time.now + rescue => e + # squash exceptions caught while flushing after logging them + @logger.error("Exception flushing files", :exception => e.message, :backtrace => e.backtrace) end # every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway) private def close_stale_files @@ -293,9 +288,82 @@ if gzip fd = Zlib::GzipWriter.new(fd) end @files[path] = IOWriter.new(fd) end + + ## + # Bare-bones utility for running a block of code at an interval. + # + class Interval + ## + # Initializes a new Interval with the given arguments and starts it before returning it. + # + # @param interval [Integer] (see: Interval#initialize) + # @param procsy [#call] (see: Interval#initialize) + # + # @return [Interval] + # + def self.start(interval, procsy) + self.new(interval, procsy).tap(&:start) + end + + ## + # @param interval [Integer]: time in seconds to wait between calling the given proc + # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions. + def initialize(interval, procsy) + @interval = interval + @procsy = procsy + + require 'thread' # Mutex, ConditionVariable, etc. + @mutex = Mutex.new + @sleeper = ConditionVariable.new + end + + ## + # Starts the interval, or returns if it has already been started. + # + # @return [void] + def start + @mutex.synchronize do + return if @thread && @thread.alive? + + @thread = Thread.new { run } + end + end + + ## + # Stop the interval. + # Does not interrupt if execution is in-progress. + def stop + @mutex.synchronize do + @stopped = true + end + + @thread && @thread.join + end + + ## + # @return [Boolean] + def alive? + @thread && @thread.alive? + end + + private + + def run + @mutex.synchronize do + loop do + @sleeper.wait(@mutex, @interval) + break if @stopped + + @procsy.call + end + end + ensure + @sleeper.broadcast + end + end # class LogStash::Outputs::File::Interval end # class LogStash::Outputs::File # wrapper class class IOWriter def initialize(io)