lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.3.1 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-1.0.0.rc1
- old
+ new
@@ -1,65 +1,52 @@
# frozen_string_literal: true
-require "systemd/journal"
-require "fluent/plugin/input"
-require "fluent/plugin/systemd/pos_writer"
-require "fluent/plugin/systemd/entry_mutator"
+require 'systemd/journal'
+require 'fluent/plugin/input'
+require 'fluent/plugin/systemd/entry_mutator'
+
module Fluent
module Plugin
- class SystemdInput < Input # rubocop:disable Metrics/ClassLength
- Fluent::Plugin.register_input("systemd", self)
+ # Fluentd plugin for reading from the systemd journal
+ class SystemdInput < Input
+ Fluent::Plugin.register_input('systemd', self)
helpers :timer, :storage
- DEFAULT_STORAGE_TYPE = "local"
+ DEFAULT_STORAGE_TYPE = 'local'
- config_param :path, :string, default: "/var/log/journal"
+ config_param :path, :string, default: '/var/log/journal'
config_param :filters, :array, default: []
- config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead"
config_param :read_from_head, :bool, default: false
- config_param :strip_underscores, :bool, default: false, deprecated: "Use <entry> section or `systemd_entry` " \
- "filter plugin instead"
config_param :tag, :string
config_section :storage do
- config_set_default :usage, "positions"
+ config_set_default :usage, 'positions'
config_set_default :@type, DEFAULT_STORAGE_TYPE
config_set_default :persistent, false
end
- config_section :entry, param_name: "entry_opts", required: false, multi: false do
+ config_section :entry, param_name: 'entry_opts', required: false, multi: false do
config_param :field_map, :hash, default: {}
config_param :field_map_strict, :bool, default: false
config_param :fields_strip_underscores, :bool, default: false
config_param :fields_lowercase, :bool, default: false
end
def configure(conf)
super
@journal = nil
- @pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions"))
- # legacy strip_underscores backwards compatibility (legacy takes
- # precedence and is mutually exclusive with the entry block)
- mut_opts = @strip_underscores ? { fields_strip_underscores: true } : @entry_opts.to_h
- @mutator = SystemdEntryMutator.new(**mut_opts)
- if @mutator.field_map_strict && @mutator.field_map.empty?
- log.warn("`field_map_strict` set to true with empty `field_map`, expect no fields")
- end
+ @pos_storage = storage_create(usage: 'positions')
+ @mutator = SystemdEntryMutator.new(**@entry_opts.to_h)
+ @mutator.warnings.each { |warning| log.warn(warning) }
end
def start
super
- @pos_storage.start
timer_execute(:in_systemd_emit_worker, 1, &method(:run))
end
- def shutdown
- @pos_storage.shutdown
- super
- end
-
private
def init_journal
# TODO: ruby 2.3
@journal.close if @journal # rubocop:disable Style/SafeNavigation
@@ -78,12 +65,12 @@
def seek
cursor = @pos_storage.get(:journal)
seek_to(cursor || read_from)
rescue Systemd::JournalError
log.warn(
- "Could not seek to cursor #{cursor} found in pos file: #{@pos_storage.path}, " \
- "falling back to reading from #{read_from}",
+ "Could not seek to cursor #{cursor} found in position file: #{@pos_storage.path}, " \
+ "falling back to reading from #{read_from}"
)
seek_to(read_from)
end
# according to https://github.com/ledbettj/systemd-journal/issues/64#issuecomment-271056644
@@ -118,10 +105,10 @@
retries ||= 0
raise e if retries > 10
retries += 1
sleep 1.5**retries + rand(0..3)
retry
- rescue => e
+ rescue => e # rubocop:disable Style/RescueStandardError
log.error("Exception emitting record: #{e}")
end
def formatted(entry)
@mutator.run(entry)