lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.1.1 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.2.0

- old
+ new

@@ -5,33 +5,41 @@ module Fluent module Plugin class SystemdInput < Input Fluent::Plugin.register_input("systemd", self) - helpers :timer + helpers :timer, :storage + DEFAULT_STORAGE_TYPE = "local".freeze + config_param :path, :string, default: "/var/log/journal" config_param :filters, :array, default: [] - config_param :pos_file, :string, default: nil + config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead" config_param :read_from_head, :bool, default: false config_param :strip_underscores, :bool, default: false config_param :tag, :string + config_section :storage do + config_set_default :usage, "positions" + config_set_default :@type, DEFAULT_STORAGE_TYPE + config_set_default :persistent, false + end + def configure(conf) super - @pos_writer = PosWriter.new(@pos_file) + @pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions")) @journal = nil end def start super - @pos_writer.start + @pos_storage.start timer_execute(:in_systemd_emit_worker, 1, &method(:run)) end def shutdown - @pos_writer.shutdown + @pos_storage.shutdown super end private @@ -47,14 +55,15 @@ log.warn("#{e.class}: #{e.message} retrying in 1s") false end def seek - seek_to(@pos_writer.cursor || read_from) + cursor = @pos_storage.get(:journal) + seek_to(cursor || read_from) rescue Systemd::JournalError log.warn( - "Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}, " \ + "Could not seek to cursor #{cursor} found in pos file: #{@pos_storage.path}, " \ "falling back to reading from #{read_from}", ) seek_to(read_from) end @@ -94,10 +103,10 @@ end def watch while @journal.move_next yield @journal.current_entry - @pos_writer.update(@journal.cursor) + @pos_storage.put(:journal, @journal.cursor) end end end end end