lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.0.5 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.0.6

- old
+ new

@@ -11,71 +11,76 @@ config_param :pos_file, :string, default: nil config_param :read_from_head, :bool, default: false config_param :strip_underscores, :bool, default: false config_param :tag, :string - attr_reader :tag - def configure(conf) super - @pos_writer = PosWriter.new(conf["pos_file"]) - @journal = Systemd::Journal.new(path: path) - @read_from_head = conf["read_from_head"] - journal.filter(*filters) + @pos_writer = PosWriter.new(@pos_file) + @journal = Systemd::Journal.new(path: @path) + @journal.filter(*@filters) seek end def start super @running = true - pos_writer.start + @pos_writer.start @thread = Thread.new(&method(:run)) end def shutdown super @running = false @thread.join - pos_writer.shutdown + @pos_writer.shutdown end private - attr_reader :journal, :running, :lock, :cursor, :path, :pos_writer, :strip_underscores, :read_from_head - def seek - journal.seek(@pos_writer.cursor || read_from) + seek_to(@pos_writer.cursor || read_from) rescue Systemd::JournalError log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}") - journal.seek(read_from) + seek_to(read_from) end + # according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644 + # and https://bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail), + # you must move back in such a way that the next move_next will return the last + # record + def seek_to(pos) + @journal.seek(pos) + return unless pos == :tail + @journal.move(-2) + end + def read_from - read_from_head ? :head : :tail + @read_from_head ? :head : :tail end def run Thread.current.abort_on_exception = true watch do |entry| begin - router.emit(tag, entry.realtime_timestamp.to_i, formatted(entry)) + router.emit(@tag, entry.realtime_timestamp.to_i, formatted(entry)) rescue => e log.error("Exception emitting record: #{e}") end end end def formatted(entry) - return entry.to_h unless strip_underscores + return entry.to_h unless @strip_underscores Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }] end def watch - while running - next unless journal.wait(1_000_000) - while journal.move_next && running - yield journal.current_entry - pos_writer.update(journal.cursor) + while @running + next unless @journal.wait(1_000_000) + while @journal.move_next && @running + yield @journal.current_entry + @pos_writer.update(@journal.cursor) end end end end end