Sha256: 8d8985d915f6054400e3a04b79cf11942d206e113a2a0341294e7bdf4f9e2931

Contents?: true

Size: 1.27 KB

Versions: 1

Compression:

Stored size: 1.27 KB

Contents

require 'fluent/plugin/output'
require 'fluent/plugin/rename_key_util'

class Fluent::Plugin::RenameKeyOutput < Fluent::Plugin::Output
  Fluent::Plugin.register_output 'rename_key', self

  helpers :event_emitter

  include Fluent::Plugin::RenameKeyUtil

  DEFAULT_APPEND_TAG = 'key_renamed'

  desc 'Specify and remove tag prefix.'
  config_param :remove_tag_prefix, :string, default: nil
  desc "Append custom tag postfix (default: #{DEFAULT_APPEND_TAG})."
  config_param :append_tag, :string, default: DEFAULT_APPEND_TAG
  desc 'Deep rename/replace operation.'
  config_param :deep_rename, :bool, default: true

  def configure conf
    super

    create_rename_rules(conf)
    create_replace_rules(conf)

    raise Fluent::ConfigError, 'No rename nor replace rules are given' if @rename_rules.empty? && @replace_rules.empty?

    @remove_tag_prefix = /^#{Regexp.escape @remove_tag_prefix}\.?/ if @remove_tag_prefix
  end

  def multi_workers_ready?
    true
  end

  def process tag, es
    es.each do |time, record|
      new_tag = @remove_tag_prefix ? tag.sub(@remove_tag_prefix, '') : tag
      new_tag = "#{new_tag}.#{@append_tag}".sub(/^\./, '')
      new_record = rename_key record
      new_record = replace_key new_record
      router.emit new_tag, time, new_record
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-rename-key-0.4.1 lib/fluent/plugin/out_rename_key.rb