lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.0 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.1.pre
- old
+ new
@@ -17,13 +17,10 @@
config_param :tag, :string
def configure(conf)
super
@pos_writer = PosWriter.new(@pos_file)
- @journal = Systemd::Journal.new(path: @path)
- @journal.filter(*@filters)
- seek
end
def start
super
@pos_writer.start
@@ -35,10 +32,16 @@
super
end
private
+ def init_journal
+ @journal = Systemd::Journal.new(path: @path)
+ @journal.filter(*@filters)
+ seek
+ end
+
def seek
seek_to(@pos_writer.cursor || read_from)
rescue Systemd::JournalError
log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}")
seek_to(read_from)
@@ -57,17 +60,19 @@
def read_from
@read_from_head ? :head : :tail
end
def run
+ init_journal
Thread.current.abort_on_exception = true
watch do |entry|
begin
router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
rescue => e
log.error("Exception emitting record: #{e}")
end
end
+ @pos_writer.sync
end
def formatted(entry)
return entry.to_h unless @strip_underscores
Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]