Sha256: b890c877497be1b35d39a5fcc85c11d1fec532a99ee219cf310275c3e44140a1

Contents?: true

Size: 1.58 KB

Versions: 1

Compression:

Stored size: 1.58 KB

Contents

require 'systemd/journal'
require 'fluent/plugin/systemd/pos_writer'

module Fluent
  class SystemdInput < Input
    Fluent::Plugin.register_input('systemd', self)

    config_param :path, :string, default: '/var/log/journal'
    config_param :filters, :array, default: []
    config_param :pos_file, :string, default: nil
    config_param :read_from_head, :bool, default: false
    config_param :strip_underscores, :bool, default: false
    config_param :tag, :string

    attr_reader :tag

    def configure(conf)
      super
      @pos_writer = PosWriter.new(conf['pos_file'])
      @journal = Systemd::Journal.new(path: path)
      journal.filter(*filters)
      read_from = @pos_writer.cursor || (conf['read_from_head'] ? :head : :tail)
      journal.seek(read_from)
    end

    def start
      super
      @running = true
      pos_writer.start
      @thread = Thread.new(&method(:run))
    end

    def shutdown
      @running = false
      @thread.join
      pos_writer.shutdown
    end

    private

    attr_reader :journal, :running, :lock, :cursor, :path, :pos_writer, :strip_underscores

    def run
      watch do |entry|
        router.emit(tag, entry.realtime_timestamp.to_i, formatted(entry))
      end
    end

    def formatted(entry)
      return entry.to_h unless strip_underscores
      Hash[entry.to_h.map { |k, v| [k.gsub(/\A_+/, ''), v] }]
    end

    def watch
      while running
        next unless journal.wait(1_000_000)
        while journal.move_next && running
          yield journal.current_entry
          pos_writer.update(journal.cursor)
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-systemd-0.0.2 lib/fluent/plugin/in_systemd.rb