lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.0.11 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.0
- old
+ new
@@ -1,99 +1,85 @@
require "systemd/journal"
-require "fluent/input"
+require "fluent/plugin/input"
require "fluent/plugin/systemd/pos_writer"
module Fluent
- class SystemdInput < Input
- Fluent::Plugin.register_input("systemd", self)
+ module Plugin
+ class SystemdInput < Input
+ Fluent::Plugin.register_input("systemd", self)
- config_param :path, :string, default: "/var/log/journal"
- config_param :filters, :array, default: []
- config_param :pos_file, :string, default: nil
- config_param :read_from_head, :bool, default: false
- config_param :strip_underscores, :bool, default: false
- config_param :tag, :string
+ helpers :timer
- def configure(conf)
- super
- @pos_writer = PosWriter.new(@pos_file)
- end
+ config_param :path, :string, default: "/var/log/journal"
+ config_param :filters, :array, default: []
+ config_param :pos_file, :string, default: nil
+ config_param :read_from_head, :bool, default: false
+ config_param :strip_underscores, :bool, default: false
+ config_param :tag, :string
- def start
- super
- @running = true
- @pos_writer.start
- @thread = Thread.new(&method(:run))
- end
+ def configure(conf)
+ super
+ @pos_writer = PosWriter.new(@pos_file)
+ @journal = Systemd::Journal.new(path: @path)
+ @journal.filter(*@filters)
+ seek
+ end
- def shutdown
- super
- @running = false
- @thread.join
- @pos_writer.shutdown
- end
+ def start
+ super
+ @pos_writer.start
+ timer_execute(:in_systemd_emit_worker, 1, &method(:run))
+ end
- private
+ def shutdown
+ @pos_writer.shutdown
+ super
+ end
- def init_journal
- @journal.close if @journal
- @journal = Systemd::Journal.new(path: @path)
- @journal.filter(*@filters)
- seek
- end
+ private
- def seek
- seek_to(@pos_writer.cursor || read_from)
- rescue Systemd::JournalError
- log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}")
- seek_to(read_from)
- end
+ def seek
+ seek_to(@pos_writer.cursor || read_from)
+ rescue Systemd::JournalError
+ log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}")
+ seek_to(read_from)
+ end
- # according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644
- # and https://bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail),
- # you must move back in such a way that the next move_next will return the last
- # record
- def seek_to(pos)
- @journal.seek(pos)
- return unless pos == :tail
- @journal.move(-2)
- end
+ # according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644
+ # and https://bugs.freedesktop.org/show_bug.cgi?id=64614, after doing a seek(:tail),
+ # you must move back in such a way that the next move_next will return the last
+ # record
+ def seek_to(pos)
+ @journal.seek(pos)
+ return unless pos == :tail
+ @journal.move(-2)
+ end
- def read_from
- @read_from_head ? :head : :tail
- end
+ def read_from
+ @read_from_head ? :head : :tail
+ end
- def run
- init_journal
- Thread.current.abort_on_exception = true
- watch do |entry|
- begin
- router.emit(@tag, entry.realtime_timestamp.to_i, formatted(entry))
- rescue => e # rubocop:disable Style/RescueStandardError
- log.error("Exception emitting record: #{e}")
+ def run
+ Thread.current.abort_on_exception = true
+ watch do |entry|
+ begin
+ router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
+ rescue => e
+ log.error("Exception emitting record: #{e}")
+ end
end
end
- end
- def formatted(entry)
- return entry.to_h unless @strip_underscores
- Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]
- end
+ def formatted(entry)
+ return entry.to_h unless @strip_underscores
+ Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]
+ end
- def watch
- while @running
- init_journal if @journal.wait(0) == :invalidate
- while @journal.move_next && @running
- begin
- yield @journal.current_entry
- rescue Systemd::JournalError => e
- log.warn("Error Parsing Journal: #{e.class}: #{e.message}")
- next
- end
+ def watch
+ while @journal.move_next
+ yield @journal.current_entry
@pos_writer.update(@journal.cursor)
end
- # prevent a loop of death
- sleep 1
end
end
end
end