lib/logstash/outputs/file.rb in logstash-output-file-4.2.4 vs lib/logstash/outputs/file.rb in logstash-output-file-4.2.5
- old
+ new
@@ -92,15 +92,16 @@
else
@file_root = File.dirname(path)
end
@failure_path = File.join(@file_root, @filename_failure)
-
- now = Time.now
- @last_flush_cycle = now
- @last_stale_cleanup_cycle = now
@flush_interval = @flush_interval.to_i
+ if @flush_interval > 0
+ @flusher = Interval.start(@flush_interval, -> { flush_pending_files })
+ end
+
+ @last_stale_cleanup_cycle = Time.now
@stale_cleanup_interval = 10
end # def register
private
def validate_path
@@ -139,19 +140,20 @@
fd.write(chunks.last)
else
# append to the file
chunks.each {|chunk| fd.write(chunk) }
end
- flush(fd)
+ fd.flush unless @flusher && @flusher.alive?
end
close_stale_files
end
end # def receive
public
def close
+ @flusher.stop unless @flusher.nil?
@io_mutex.synchronize do
@logger.debug("Close: closing files")
@files.each do |path, fd|
begin
@@ -198,31 +200,24 @@
def extract_file_root
parts = File.expand_path(path).split(File::SEPARATOR)
parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR)
end
+ # the back-bone of @flusher, our periodic-flushing interval.
private
- def flush(fd)
- if flush_interval > 0
- flush_pending_files
- else
- fd.flush
- end
- end
-
- # every flush_interval seconds or so (triggered by events, but if there are no events there's no point flushing files anyway)
- private
def flush_pending_files
- return unless Time.now - @last_flush_cycle >= flush_interval
- @logger.debug("Starting flush cycle")
+ @io_mutex.synchronize do
+ @logger.debug("Starting flush cycle")
- @files.each do |path, fd|
- @logger.debug("Flushing file", :path => path, :fd => fd)
- fd.flush
+ @files.each do |path, fd|
+ @logger.debug("Flushing file", :path => path, :fd => fd)
+ fd.flush
+ end
end
-
- @last_flush_cycle = Time.now
+ rescue => e
+ # squash exceptions caught while flushing after logging them
+ @logger.error("Exception flushing files", :exception => e.message, :backtrace => e.backtrace)
end
# every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
private
def close_stale_files
@@ -293,9 +288,82 @@
if gzip
fd = Zlib::GzipWriter.new(fd)
end
@files[path] = IOWriter.new(fd)
end
+
+ ##
+ # Bare-bones utility for running a block of code at an interval.
+ #
+ class Interval
+ ##
+ # Initializes a new Interval with the given arguments and starts it before returning it.
+ #
+ # @param interval [Integer] (see: Interval#initialize)
+ # @param procsy [#call] (see: Interval#initialize)
+ #
+ # @return [Interval]
+ #
+ def self.start(interval, procsy)
+ self.new(interval, procsy).tap(&:start)
+ end
+
+ ##
+ # @param interval [Integer]: time in seconds to wait between calling the given proc
+ # @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions.
+ def initialize(interval, procsy)
+ @interval = interval
+ @procsy = procsy
+
+ require 'thread' # Mutex, ConditionVariable, etc.
+ @mutex = Mutex.new
+ @sleeper = ConditionVariable.new
+ end
+
+ ##
+ # Starts the interval, or returns if it has already been started.
+ #
+ # @return [void]
+ def start
+ @mutex.synchronize do
+ return if @thread && @thread.alive?
+
+ @thread = Thread.new { run }
+ end
+ end
+
+ ##
+ # Stop the interval.
+ # Does not interrupt if execution is in-progress.
+ def stop
+ @mutex.synchronize do
+ @stopped = true
+ end
+
+ @thread && @thread.join
+ end
+
+ ##
+ # @return [Boolean]
+ def alive?
+ @thread && @thread.alive?
+ end
+
+ private
+
+ def run
+ @mutex.synchronize do
+ loop do
+ @sleeper.wait(@mutex, @interval)
+ break if @stopped
+
+ @procsy.call
+ end
+ end
+ ensure
+ @sleeper.broadcast
+ end
+ end # class LogStash::Outputs::File::Interval
end # class LogStash::Outputs::File
# wrapper class
class IOWriter
def initialize(io)