Sha256: 4749327a154eb693611e5690a394a4f7700a595c5643b5f687e0d76b73b618bf

Contents?: true

Size: 1.96 KB

Versions: 3

Compression:

Stored size: 1.96 KB

Contents

class Fluent::URIParserFilter < Fluent::Filter
  Fluent::Plugin.register_filter("uri_parser", self)

  config_param :key_name, :string
  config_param :hash_value_field, :string, default: nil
  config_param :inject_key_prefix, :string, default: nil
  config_param :suppress_parse_error_log, :bool, default: false
  config_param :ignore_key_not_exist, :bool, default: false
  config_param :ignore_nil, :bool, default: false

  config_param :out_key_scheme, :string, default: nil
  config_param :out_key_host, :string, default: nil
  config_param :out_key_port, :string, default: nil
  config_param :out_key_path, :string, default: nil
  config_param :out_key_query, :string, default: nil
  config_param :out_key_fragment, :string, default: nil

  def initialize
    super
    require "addressable/uri"
  end

  def filter_stream(tag, es)
    new_es = Fluent::MultiEventStream.new

    es.each do |time, record|
      raw_value = record[@key_name]
      if raw_value.nil?
        new_es.add(time, record) unless @ignore_key_not_exist
        next
      end

      begin
        uri = Addressable::URI.parse(raw_value)

        values = {}
        values[@out_key_scheme] = uri.scheme if @out_key_scheme
        values[@out_key_host] = uri.host if @out_key_host
        values[@out_key_port] = uri.inferred_port if @out_key_port
        values[@out_key_path] = uri.path if @out_key_path
        values[@out_key_query] = uri.query if @out_key_query
        values[@out_key_fragment] = uri.fragment if @out_key_fragment
        values.reject! {|_, v| v.nil? } if @ignore_nil

        unless values.empty?
          if @inject_key_prefix
            values = Hash[values.map{|k,v| [ @inject_key_prefix + k, v ]}]
          end
          r = @hash_value_field ? { @hash_value_field => values } : values
          record = record.merge(r)
        end

        new_es.add(time, record)
      rescue => e
        log.warn "parse failed #{e.message}" unless @suppress_parse_error_log
      end
    end

    new_es
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
fluent-plugin-uri-parser-0.2.0 lib/fluent/plugin/filter_uri_parser.rb
fluent-plugin-uri-parser-0.1.0 lib/fluent/plugin/filter_uri_parser.rb
fluent-plugin-uri-parser-0.0.2 lib/fluent/plugin/filter_uri_parser.rb