lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.2.0 vs lib/fluent/plugin/in_systemd.rb in fluent-plugin-systemd-0.3.0

- old
+ new

@@ -1,35 +1,52 @@ +# frozen_string_literal: true require "systemd/journal" require "fluent/plugin/input" require "fluent/plugin/systemd/pos_writer" +require "fluent/plugin/systemd/entry_mutator" module Fluent module Plugin class SystemdInput < Input Fluent::Plugin.register_input("systemd", self) helpers :timer, :storage - DEFAULT_STORAGE_TYPE = "local".freeze + DEFAULT_STORAGE_TYPE = "local" config_param :path, :string, default: "/var/log/journal" config_param :filters, :array, default: [] config_param :pos_file, :string, default: nil, deprecated: "Use <storage> section with `persistent: true' instead" config_param :read_from_head, :bool, default: false - config_param :strip_underscores, :bool, default: false + config_param :strip_underscores, :bool, default: false, deprecated: "Use <entry> section or `systemd_entry` " \ + "filter plugin instead" config_param :tag, :string config_section :storage do config_set_default :usage, "positions" config_set_default :@type, DEFAULT_STORAGE_TYPE config_set_default :persistent, false end + config_section :entry, param_name: "entry_opts", required: false, multi: false do + config_param :field_map, :hash, default: {} + config_param :field_map_strict, :bool, default: false + config_param :fields_strip_underscores, :bool, default: false + config_param :fields_lowercase, :bool, default: false + end + def configure(conf) super + @journal = nil @pos_storage = PosWriter.new(@pos_file, storage_create(usage: "positions")) - @journal = nil + # legacy strip_underscores backwards compatibility (legacy takes + # precedence and is mutually exclusive with the entry block) + mut_opts = @strip_underscores ? { fields_strip_underscores: true } : @entry_opts.to_h + @mutator = SystemdEntryMutator.new(**mut_opts) + if @mutator.field_map_strict && @mutator.field_map.empty? + log.warn("`field_map_strict` set to true with empty `field_map`, expect no fields") + end end def start super @pos_storage.start @@ -87,20 +104,27 @@ def run return unless @journal || init_journal init_journal if @journal.wait(0) == :invalidate watch do |entry| - begin - router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry)) - rescue => e - log.error("Exception emitting record: #{e}") - end + emit(entry) end end + def emit(entry) + router.emit(@tag, Fluent::EventTime.from_time(entry.realtime_timestamp), formatted(entry)) + rescue Fluent::Plugin::Buffer::BufferOverflowError => e + retries ||= 0 + raise e if retries > 10 + retries += 1 + sleep 1.5**retries + rand(0..3) + retry + rescue => e + log.error("Exception emitting record: #{e}") + end + def formatted(entry) - return entry.to_h unless @strip_underscores - Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }] + @mutator.run(entry) end def watch while @journal.move_next yield @journal.current_entry