lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.1.pre2 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.1.pre3
- old
+ new
@@ -17,10 +17,11 @@
config_param :tag, :string
def configure(conf)
super
@pos_writer = PosWriter.new(@pos_file)
+ init_journal
end
def start
super
@pos_writer.start
@@ -34,10 +35,13 @@
private
def init_journal
@journal = Systemd::Journal.new(path: @path)
+ # make sure initial call to wait doesn't return :invalidate
+ # see https://github.com/ledbettj/systemd-journal/issues/70
+ @journal.wait(0)
@journal.filter(*@filters)
seek
end
def seek
@@ -64,19 +68,17 @@
def read_from
@read_from_head ? :head : :tail
end
def run
- init_journal
- Thread.current.abort_on_exception = true
+ init_journal if @journal.wait(0) == :invalidate
watch do |entry|
begin
router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
rescue => e
log.error("Exception emitting record: #{e}")
end
end
- @pos_writer.sync
end
def formatted(entry)
return entry.to_h unless @strip_underscores
Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]