lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.0 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.1.pre

- old
+ new

@@ -17,13 +17,10 @@ config_param :tag, :string def configure(conf) super @pos_writer = PosWriter.new(@pos_file) - @journal = Systemd::Journal.new(path: @path) - @journal.filter(*@filters) - seek end def start super @pos_writer.start @@ -35,10 +32,16 @@ super end private + def init_journal + @journal = Systemd::Journal.new(path: @path) + @journal.filter(*@filters) + seek + end + def seek seek_to(@pos_writer.cursor || read_from) rescue Systemd::JournalError log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}") seek_to(read_from) @@ -57,17 +60,19 @@ def read_from @read_from_head ? :head : :tail end def run + init_journal Thread.current.abort_on_exception = true watch do |entry| begin router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry)) rescue => e log.error("Exception emitting record: #{e}") end end + @pos_writer.sync end def formatted(entry) return entry.to_h unless @strip_underscores Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]