lib/logstash/outputs/file.rb in logstash-output-file-4.2.6 vs lib/logstash/outputs/file.rb in logstash-output-file-4.3.0
- old
+ new
@@ -71,14 +71,18 @@
# be written at the end of the file.
#
# If `overwrite`, the file will be truncated before writing and only the most
# recent event will appear in the file.
config :write_behavior, :validate => [ "overwrite", "append" ], :default => "append"
+
+ # How often should the stale files cleanup cycle run (in seconds).
+ # The stale files cleanup cycle closes inactive files (i.e files not written to since the last cycle).
+ config :stale_cleanup_interval, :validate => :number, :default => 10
+
default :codec, "json_lines"
- public
def register
require "fileutils" # For mkdir_p
@files = {}
@io_mutex = Mutex.new
@@ -98,33 +102,12 @@
if @flush_interval > 0
@flusher = Interval.start(@flush_interval, -> { flush_pending_files })
end
@last_stale_cleanup_cycle = Time.now
- @stale_cleanup_interval = 10
- end # def register
-
- private
- def validate_path
- if (root_directory =~ FIELD_REF) != nil
- @logger.error("File: The starting part of the path should not be dynamic.", :path => @path)
- raise LogStash::ConfigurationError.new("The starting part of the path should not be dynamic.")
- end
end
- private
- def root_directory
- parts = @path.split(File::SEPARATOR).select { |item| !item.empty? }
- if Gem.win_platform?
- # First part is the drive letter
- parts[1]
- else
- parts.first
- end
- end
-
- public
def multi_receive_encoded(events_and_encoded)
encoded_by_path = Hash.new {|h,k| h[k] = []}
events_and_encoded.each do |event,encoded|
file_output_path = event_path(event)
@@ -145,13 +128,12 @@
fd.flush unless @flusher && @flusher.alive?
end
close_stale_files
end
- end # def receive
+ end
- public
def close
@flusher.stop unless @flusher.nil?
@io_mutex.synchronize do
@logger.debug("Close: closing files")
@@ -165,16 +147,33 @@
end
end
end
private
+
+ def validate_path
+ if (root_directory =~ FIELD_REF) != nil
+ @logger.error("File: The starting part of the path should not be dynamic.", :path => @path)
+ raise LogStash::ConfigurationError.new("The starting part of the path should not be dynamic.")
+ end
+ end
+
+ def root_directory
+ parts = @path.split(File::SEPARATOR).select { |item| !item.empty? }
+ if Gem.win_platform?
+ # First part is the drive letter
+ parts[1]
+ else
+ parts.first
+ end
+ end
+
def inside_file_root?(log_path)
target_file = File.expand_path(log_path)
return target_file.start_with?("#{@file_root.to_s}/")
end
- private
def event_path(event)
file_output_path = generate_filepath(event)
if path_with_field_ref? && !inside_file_root?(file_output_path)
@logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path)
file_output_path = @failure_path
@@ -184,28 +183,24 @@
@logger.debug("File, writing event to file.", :filename => file_output_path)
file_output_path
end
- private
def generate_filepath(event)
event.sprintf(@path)
end
- private
def path_with_field_ref?
path =~ FIELD_REF
end
- private
def extract_file_root
parts = File.expand_path(path).split(File::SEPARATOR)
parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR)
end
# the back-bone of @flusher, our periodic-flushing interval.
- private
def flush_pending_files
@io_mutex.synchronize do
@logger.debug("Starting flush cycle")
@files.each do |path, fd|
@@ -217,11 +212,10 @@
# squash exceptions caught while flushing after logging them
@logger.error("Exception flushing files", :exception => e.message, :backtrace => e.backtrace)
end
# every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway)
- private
def close_stale_files
now = Time.now
return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval
@logger.debug("Starting stale files cleanup cycle", :files => @files)
@@ -235,21 +229,18 @@
# mark all files as inactive, a call to write will mark them as active again
@files.each { |path, fd| fd.active = false }
@last_stale_cleanup_cycle = now
end
- private
def cached?(path)
@files.include?(path) && !@files[path].nil?
end
- private
def deleted?(path)
!File.exist?(path)
end
- private
def open(path)
if !deleted?(path) && cached?(path)
return @files[path]
end
@@ -360,33 +351,37 @@
end
end
ensure
@sleeper.broadcast
end
- end # class LogStash::Outputs::File::Interval
-end # class LogStash::Outputs::File
+ end
+end
# wrapper class
class IOWriter
+ attr_accessor :active
+
def initialize(io)
@io = io
end
+
def write(*args)
@io.write(*args)
@active = true
end
+
def flush
@io.flush
if @io.class == Zlib::GzipWriter
@io.to_io.flush
end
end
+
def method_missing(method_name, *args, &block)
if @io.respond_to?(method_name)
@io.send(method_name, *args, &block)
else
super
end
end
- attr_accessor :active
end