lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.1 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.2.0
- old
+ new
@@ -5,33 +5,41 @@
module Fluent
module Plugin
class SystemdInput < Input
Fluent::Plugin.register_input("systemd", self)
- helpers :timer
+ helpers :timer, :storage
+ DEFAULT_STORAGE_TYPE = "local".freeze
+
config_param :path, :string, default: "/var/log/journal"
config_param :filters, :array, default: []
- config_param :pos_file, :string, default: nil
+ config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead"
config_param :read_from_head, :bool, default: false
config_param :strip_underscores, :bool, default: false
config_param :tag, :string
+ config_section :storage do
+ config_set_default :usage, "positions"
+ config_set_default :@type, DEFAULT_STORAGE_TYPE
+ config_set_default :persistent, false
+ end
+
def configure(conf)
super
- @pos_writer = PosWriter.new(@pos_file)
+ @pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions"))
@journal = nil
end
def start
super
- @pos_writer.start
+ @pos_storage.start
timer_execute(:in_systemd_emit_worker, 1, &method(:run))
end
def shutdown
- @pos_writer.shutdown
+ @pos_storage.shutdown
super
end
private
@@ -47,14 +55,15 @@
log.warn("#{e.class}: #{e.message} retrying in 1s")
false
end
def seek
- seek_to(@pos_writer.cursor || read_from)
+ cursor = @pos_storage.get(:journal)
+ seek_to(cursor || read_from)
rescue Systemd::JournalError
log.warn(
- "Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}, " \
+ "Could not seek to cursor #{cursor} found in pos file: #{@pos_storage.path}, " \
"falling back to reading from #{read_from}",
)
seek_to(read_from)
end
@@ -94,10 +103,10 @@
end
def watch
while @journal.move_next
yield @journal.current_entry
- @pos_writer.update(@journal.cursor)
+ @pos_storage.put(:journal, @journal.cursor)
end
end
end
end
end