lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.0.5 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.0.6
- old
+ new
@@ -11,71 +11,76 @@
config_param :pos_file, :string, default: nil
config_param :read_from_head, :bool, default: false
config_param :strip_underscores, :bool, default: false
config_param :tag, :string
- attr_reader :tag
-
def configure(conf)
super
- @pos_writer = PosWriter.new(conf["pos_file"])
- @journal = Systemd::Journal.new(path: path)
- @read_from_head = conf["read_from_head"]
- journal.filter(*filters)
+ @pos_writer = PosWriter.new(@pos_file)
+ @journal = Systemd::Journal.new(path: @path)
+ @journal.filter(*@filters)
seek
end
def start
super
@running = true
- pos_writer.start
+ @pos_writer.start
@thread = Thread.new(&method(:run))
end
def shutdown
super
@running = false
@thread.join
- pos_writer.shutdown
+ @pos_writer.shutdown
end
private
- attr_reader :journal, :running, :lock, :cursor, :path, :pos_writer, :strip_underscores, :read_from_head
-
def seek
- journal.seek(@pos_writer.cursor || read_from)
+ seek_to(@pos_writer.cursor || read_from)
rescue Systemd::JournalError
log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}")
- journal.seek(read_from)
+ seek_to(read_from)
end
+ # according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644
+ # and https://bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail),
+ # you must move back in such a way that the next move_next will return the last
+ # record
+ def seek_to(pos)
+ @journal.seek(pos)
+ return unless pos == :tail
+ @journal.move(-2)
+ end
+
def read_from
- read_from_head ? :head : :tail
+ @read_from_head ? :head : :tail
end
def run
Thread.current.abort_on_exception = true
watch do |entry|
begin
- router.emit(tag, entry.realtime_timestamp.to_i, formatted(entry))
+ router.emit(@tag, entry.realtime_timestamp.to_i, formatted(entry))
rescue => e
log.error("Exception emitting record: #{e}")
end
end
end
def formatted(entry)
- return entry.to_h unless strip_underscores
+ return entry.to_h unless @strip_underscores
Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]
end
def watch
- while running
- next unless journal.wait(1_000_000)
- while journal.move_next && running
- yield journal.current_entry
- pos_writer.update(journal.cursor)
+ while @running
+ next unless @journal.wait(1_000_000)
+ while @journal.move_next && @running
+ yield @journal.current_entry
+ @pos_writer.update(@journal.cursor)
end
end
end
end
end