lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-1.0.2 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-1.0.3
- old
+ new
@@ -90,10 +90,11 @@
# you must move back in such a way that the next move_next will return the last
# record
def seek_to(pos)
@journal.seek(pos)
return if pos == :head
+
if pos == :tail
@journal.move(-2)
else
@journal.move(1)
end
@@ -103,10 +104,11 @@
@read_from_head ? :head : :tail
end
def run
return unless @journal || init_journal
+
init_journal if @journal.wait(0) == :invalidate
watch do |entry|
emit(entry)
end
end
@@ -114,10 +116,11 @@
def emit(entry)
router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
rescue Fluent::Plugin::Buffer::BufferOverflowError => e
retries ||= 0
raise e if retries > 10
+
retries += 1
sleep 1.5**retries + rand(0..3)
retry
rescue => e # rubocop:disable Style/RescueStandardError
log.error("Exception emitting record: #{e}")
@@ -127,9 +130,11 @@
@mutator.run(entry)
end
def watch(&block)
yield_current_entry(&block) while @journal.move_next
+ rescue Systemd::JournalError => e
+ log.warn("Error moving to next Journal entry: #{e.class}: #{e.message}")
end
def yield_current_entry
yield @journal.current_entry
@pos_storage.put(:journal, @journal.cursor)