Sha256: 610f90e1a5a9c62edfa3792dc6733ba3757567959fb155c1ae6c60430867b68d

Contents?: true

Size: 1.84 KB

Versions: 1

Compression:

Stored size: 1.84 KB

Contents

require "systemd/journal"
require "fluent/plugin/systemd/pos_writer"

module Fluent
  class SystemdInput < Input
    Fluent::Plugin.register_input("systemd", self)

    config_param :path, :string, default: "/var/log/journal"
    config_param :filters, :array, default: []
    config_param :pos_file, :string, default: nil
    config_param :read_from_head, :bool, default: false
    config_param :strip_underscores, :bool, default: false
    config_param :tag, :string

    attr_reader :tag

    def configure(conf)
      super
      @pos_writer = PosWriter.new(conf["pos_file"])
      @journal = Systemd::Journal.new(path: path)
      @read_from_head = conf["read_from_head"]
      journal.filter(*filters)
      seek
    end

    def start
      super
      @running = true
      pos_writer.start
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      @running = false
      @thread.join
      pos_writer.shutdown
    end

    private

    attr_reader :journal, :running, :lock, :cursor, :path, :pos_writer, :strip_underscores, :read_from_head

    def seek
      journal.seek(@pos_writer.cursor || read_from)
    rescue Systemd::JournalError
      log.warn("Could not seek to cursor #{@pos_writer.cursor} found in pos file: #{@pos_writer.path}")
      journal.seek(read_from)
    end

    def read_from
      read_from_head ? :head : :tail
    end

    def run
      watch do |entry|
        router.emit(tag, entry.realtime_timestamp.to_i, formatted(entry))
      end
    end

    def formatted(entry)
      return entry.to_h unless strip_underscores
      Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ""), v] }]
    end

    def watch
      while running
        next unless journal.wait(1_000_000)
        while journal.move_next && running
          yield journal.current_entry
          pos_writer.update(journal.cursor)
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-systemd-0.0.3 lib/fluent/plugin/in_systemd.rb