lib/filewatch/read_mode/handlers/base.rb in logstash-input-file-4.1.3 vs lib/filewatch/read_mode/handlers/base.rb in logstash-input-file-4.1.4
- old
+ new
@@ -17,11 +17,11 @@
def quit?
@processor.watch.quit?
end
def handle(watched_file)
- logger.debug("handling: #{watched_file.path}")
+ logger.trace("handling: #{watched_file.path}")
unless watched_file.has_listener?
watched_file.set_listener(@observer)
end
handle_specifically(watched_file)
end
@@ -32,11 +32,11 @@
private
def open_file(watched_file)
return true if watched_file.file_open?
- logger.debug("opening #{watched_file.path}")
+ logger.trace("opening #{watched_file.path}")
begin
watched_file.open
rescue
# don't emit this message too often. if a file that we can't
# read is changing a lot, we'll try to open it more often, and spam the logs.
@@ -44,11 +44,11 @@
logger.warn("opening OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
if watched_file.last_open_warning_at.nil? || now - watched_file.last_open_warning_at > OPEN_WARN_INTERVAL
logger.warn("failed to open #{watched_file.path}: #{$!.inspect}, #{$!.backtrace.take(3)}")
watched_file.last_open_warning_at = now
else
- logger.debug("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
+ logger.trace("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
end
watched_file.watch # set it back to watch so we can try it again
end
if watched_file.file_open?
watched_file.listener.opened
@@ -63,24 +63,37 @@
if sincedb_value.nil?
add_new_value_sincedb_collection(watched_file)
elsif sincedb_value.watched_file == watched_file
update_existing_sincedb_collection_value(watched_file, sincedb_value)
else
- logger.warn? && logger.warn("mismatch on sincedb_value.watched_file, this should have been handled by Discoverer")
+ msg = "add_or_update_sincedb_collection: the found sincedb_value has a watched_file - this is a rename, switching inode to this watched file"
+ logger.trace(msg)
+ existing_watched_file = sincedb_value.watched_file
+ if existing_watched_file.nil?
+ sincedb_value.set_watched_file(watched_file)
+ logger.trace("add_or_update_sincedb_collection: switching as new file")
+ watched_file.rotate_as_file
+ watched_file.update_bytes_read(sincedb_value.position)
+ else
+ sincedb_value.set_watched_file(watched_file)
+ logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details)
+ watched_file.rotate_from(existing_watched_file)
+ end
+
end
watched_file.initial_completed
end
def update_existing_sincedb_collection_value(watched_file, sincedb_value)
- logger.debug("update_existing_sincedb_collection_value: #{watched_file.path}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
+ logger.trace("update_existing_sincedb_collection_value: #{watched_file.path}, last value #{sincedb_value.position}, cur size #{watched_file.last_stat_size}")
# sincedb_value is the source of truth
watched_file.update_bytes_read(sincedb_value.position)
end
def add_new_value_sincedb_collection(watched_file)
sincedb_value = SincedbValue.new(0)
sincedb_value.set_watched_file(watched_file)
- logger.debug("add_new_value_sincedb_collection: #{watched_file.path}", "position" => sincedb_value.position)
+ logger.trace("add_new_value_sincedb_collection: #{watched_file.path}", "position" => sincedb_value.position)
sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
end
end
end end end