lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.3.1 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-1.0.0.rc1

- old
+ new

@@ -1,65 +1,52 @@ # frozen_string_literal: true -require "systemd/journal" -require "fluent/plugin/input" -require "fluent/plugin/systemd/pos_writer" -require "fluent/plugin/systemd/entry_mutator" +require 'systemd/journal' +require 'fluent/plugin/input' +require 'fluent/plugin/systemd/entry_mutator' + module Fluent module Plugin - class SystemdInput < Input # rubocop:disable Metrics/ClassLength - Fluent::Plugin.register_input("systemd", self) + # Fluentd plugin for reading from the systemd journal + class SystemdInput < Input + Fluent::Plugin.register_input('systemd', self) helpers :timer, :storage - DEFAULT_STORAGE_TYPE = "local" + DEFAULT_STORAGE_TYPE = 'local' - config_param :path, :string, default: "/var/log/journal" + config_param :path, :string, default: '/var/log/journal' config_param :filters, :array, default: [] - config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead" config_param :read_from_head, :bool, default: false - config_param :strip_underscores, :bool, default: false, deprecated: "Use <entry> section or `systemd_entry` " \ - "filter plugin instead" config_param :tag, :string config_section :storage do - config_set_default :usage, "positions" + config_set_default :usage, 'positions' config_set_default :@type, DEFAULT_STORAGE_TYPE config_set_default :persistent, false end - config_section :entry, param_name: "entry_opts", required: false, multi: false do + config_section :entry, param_name: 'entry_opts', required: false, multi: false do config_param :field_map, :hash, default: {} config_param :field_map_strict, :bool, default: false config_param :fields_strip_underscores, :bool, default: false config_param :fields_lowercase, :bool, default: false end def configure(conf) super @journal = nil - @pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions")) - # legacy strip_underscores backwards compatibility (legacy takes - # precedence and is mutually exclusive with the entry block) - mut_opts = @strip_underscores ? { fields_strip_underscores: true } : @entry_opts.to_h - @mutator = SystemdEntryMutator.new(**mut_opts) - if @mutator.field_map_strict && @mutator.field_map.empty? - log.warn("`field_map_strict` set to true with empty `field_map`, expect no fields") - end + @pos_storage = storage_create(usage: 'positions') + @mutator = SystemdEntryMutator.new(**@entry_opts.to_h) + @mutator.warnings.each { |warning| log.warn(warning) } end def start super - @pos_storage.start timer_execute(:in_systemd_emit_worker, 1, &method(:run)) end - def shutdown - @pos_storage.shutdown - super - end - private def init_journal # TODO: ruby 2.3 @journal.close if @journal # rubocop:disable Style/SafeNavigation @@ -78,12 +65,12 @@ def seek cursor = @pos_storage.get(:journal) seek_to(cursor || read_from) rescue Systemd::JournalError log.warn( - "Could not seek to cursor #{cursor} found in pos file: #{@pos_storage.path}, " \ - "falling back to reading from #{read_from}", + "Could not seek to cursor #{cursor} found in position file: #{@pos_storage.path}, " \ + "falling back to reading from #{read_from}" ) seek_to(read_from) end # according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644 @@ -118,10 +105,10 @@ retries ||= 0 raise e if retries > 10 retries += 1 sleep 1.5**retries + rand(0..3) retry - rescue => e + rescue => e # rubocop:disable Style/RescueStandardError log.error("Exception emitting record: #{e}") end def formatted(entry) @mutator.run(entry)