lib/logstash/outputs/file.rb in logstash-output-file-4.2.6 vs lib/logstash/outputs/file.rb in logstash-output-file-4.3.0

- old
+ new

@@ -71,14 +71,18 @@ # be written at the end of the file. # # If `overwrite`, the file will be truncated before writing and only the most # recent event will appear in the file. config :write_behavior, :validate => [ "overwrite", "append" ], :default => "append" + + # How often should the stale files cleanup cycle run (in seconds). + # The stale files cleanup cycle closes inactive files (i.e files not written to since the last cycle). + config :stale_cleanup_interval, :validate => :number, :default => 10 + default :codec, "json_lines" - public def register require "fileutils" # For mkdir_p @files = {} @io_mutex = Mutex.new @@ -98,33 +102,12 @@ if @flush_interval > 0 @flusher = Interval.start(@flush_interval, -> { flush_pending_files }) end @last_stale_cleanup_cycle = Time.now - @stale_cleanup_interval = 10 - end # def register - - private - def validate_path - if (root_directory =~ FIELD_REF) != nil - @logger.error("File: The starting part of the path should not be dynamic.", :path => @path) - raise LogStash::ConfigurationError.new("The starting part of the path should not be dynamic.") - end end - private - def root_directory - parts = @path.split(File::SEPARATOR).select { |item| !item.empty? } - if Gem.win_platform? - # First part is the drive letter - parts[1] - else - parts.first - end - end - - public def multi_receive_encoded(events_and_encoded) encoded_by_path = Hash.new {|h,k| h[k] = []} events_and_encoded.each do |event,encoded| file_output_path = event_path(event) @@ -145,13 +128,12 @@ fd.flush unless @flusher && @flusher.alive? end close_stale_files end - end # def receive + end - public def close @flusher.stop unless @flusher.nil? @io_mutex.synchronize do @logger.debug("Close: closing files") @@ -165,16 +147,33 @@ end end end private + + def validate_path + if (root_directory =~ FIELD_REF) != nil + @logger.error("File: The starting part of the path should not be dynamic.", :path => @path) + raise LogStash::ConfigurationError.new("The starting part of the path should not be dynamic.") + end + end + + def root_directory + parts = @path.split(File::SEPARATOR).select { |item| !item.empty? } + if Gem.win_platform? + # First part is the drive letter + parts[1] + else + parts.first + end + end + def inside_file_root?(log_path) target_file = File.expand_path(log_path) return target_file.start_with?("#{@file_root.to_s}/") end - private def event_path(event) file_output_path = generate_filepath(event) if path_with_field_ref? && !inside_file_root?(file_output_path) @logger.warn("File: the event tried to write outside the files root, writing the event to the failure file", :event => event, :filename => @failure_path) file_output_path = @failure_path @@ -184,28 +183,24 @@ @logger.debug("File, writing event to file.", :filename => file_output_path) file_output_path end - private def generate_filepath(event) event.sprintf(@path) end - private def path_with_field_ref? path =~ FIELD_REF end - private def extract_file_root parts = File.expand_path(path).split(File::SEPARATOR) parts.take_while { |part| part !~ FIELD_REF }.join(File::SEPARATOR) end # the back-bone of @flusher, our periodic-flushing interval. - private def flush_pending_files @io_mutex.synchronize do @logger.debug("Starting flush cycle") @files.each do |path, fd| @@ -217,11 +212,10 @@ # squash exceptions caught while flushing after logging them @logger.error("Exception flushing files", :exception => e.message, :backtrace => e.backtrace) end # every 10 seconds or so (triggered by events, but if there are no events there's no point closing files anyway) - private def close_stale_files now = Time.now return unless now - @last_stale_cleanup_cycle >= @stale_cleanup_interval @logger.debug("Starting stale files cleanup cycle", :files => @files) @@ -235,21 +229,18 @@ # mark all files as inactive, a call to write will mark them as active again @files.each { |path, fd| fd.active = false } @last_stale_cleanup_cycle = now end - private def cached?(path) @files.include?(path) && !@files[path].nil? end - private def deleted?(path) !File.exist?(path) end - private def open(path) if !deleted?(path) && cached?(path) return @files[path] end @@ -360,33 +351,37 @@ end end ensure @sleeper.broadcast end - end # class LogStash::Outputs::File::Interval -end # class LogStash::Outputs::File + end +end # wrapper class class IOWriter + attr_accessor :active + def initialize(io) @io = io end + def write(*args) @io.write(*args) @active = true end + def flush @io.flush if @io.class == Zlib::GzipWriter @io.to_io.flush end end + def method_missing(method_name, *args, &block) if @io.respond_to?(method_name) @io.send(method_name, *args, &block) else super end end - attr_accessor :active end