lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.2.0 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.3.0
- old
+ new
@@ -1,35 +1,52 @@
+# frozen_string_literal: true
require "systemd/journal"
require "fluent/plugin/input"
require "fluent/plugin/systemd/pos_writer"
+require "fluent/plugin/systemd/entry_mutator"
module Fluent
module Plugin
class SystemdInput < Input
Fluent::Plugin.register_input("systemd", self)
helpers :timer, :storage
- DEFAULT_STORAGE_TYPE = "local".freeze
+ DEFAULT_STORAGE_TYPE = "local"
config_param :path, :string, default: "/var/log/journal"
config_param :filters, :array, default: []
config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead"
config_param :read_from_head, :bool, default: false
- config_param :strip_underscores, :bool, default: false
+ config_param :strip_underscores, :bool, default: false, deprecated: "Use <entry> section or `systemd_entry` " \
+ "filter plugin instead"
config_param :tag, :string
config_section :storage do
config_set_default :usage, "positions"
config_set_default :@type, DEFAULT_STORAGE_TYPE
config_set_default :persistent, false
end
+ config_section :entry, param_name: "entry_opts", required: false, multi: false do
+ config_param :field_map, :hash, default: {}
+ config_param :field_map_strict, :bool, default: false
+ config_param :fields_strip_underscores, :bool, default: false
+ config_param :fields_lowercase, :bool, default: false
+ end
+
def configure(conf)
super
+ @journal = nil
@pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions"))
- @journal = nil
+ # legacy strip_underscores backwards compatibility (legacy takes
+ # precedence and is mutually exclusive with the entry block)
+ mut_opts = @strip_underscores ? { fields_strip_underscores: true } : @entry_opts.to_h
+ @mutator = SystemdEntryMutator.new(**mut_opts)
+ if @mutator.field_map_strict && @mutator.field_map.empty?
+ log.warn("`field_map_strict` set to true with empty `field_map`, expect no fields")
+ end
end
def start
super
@pos_storage.start
@@ -87,20 +104,27 @@
def run
return unless @journal || init_journal
init_journal if @journal.wait(0) == :invalidate
watch do |entry|
- begin
- router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
- rescue => e
- log.error("Exception emitting record: #{e}")
- end
+ emit(entry)
end
end
+ def emit(entry)
+ router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry))
+ rescue Fluent::Plugin::Buffer::BufferOverflowError => e
+ retries ||= 0
+ raise e if retries > 10
+ retries += 1
+ sleep 1.5**retries + rand(0..3)
+ retry
+ rescue => e
+ log.error("Exception emitting record: #{e}")
+ end
+
def formatted(entry)
- return entry.to_h unless @strip_underscores
- Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]
+ @mutator.run(entry)
end
def watch
while @journal.move_next
yield @journal.current_entry