Sha256: d78e19bdf62ae6d900115158371b9aa9b78ecd128454681da9a82fbd9e86793d

Contents?: true

Size: 1.68 KB

Versions: 1

Compression:

Stored size: 1.68 KB

Contents

require 'fluent/plugin/output'
require 'fluent/plugin/geoip'

class Fluent::Plugin::GeoipOutput < Fluent::Plugin::Output
  Fluent::Plugin.register_output('geoip', self)

  helpers :event_emitter, :inject, :compat_parameters

  config_param :geoip_database, :string, default: File.expand_path('../../../data/GeoLiteCity.dat', __dir__)
  config_param :geoip2_database, :string, default: File.expand_path('../../../data/GeoLite2-City.mmdb', __dir__)
  config_param :geoip_lookup_key, :string, default: 'host'
  config_param :tag, :string, default: nil
  config_param :skip_adding_null_record, :bool, default: false

  config_set_default :@log_level, "warn"

  config_param :backend_library, :enum, list: Fluent::GeoIP::BACKEND_LIBRARIES, default: :geoip2_c
  config_section :buffer do
    config_set_default :@type, :memory
    config_set_default :chunk_keys, ['tag']
    config_set_default :flush_interval, 0
  end

  def configure(conf)
    compat_parameters_convert(conf, :buffer, :inject, default_chunk_key: 'tag')
    super
    raise Fluetn::ConfigError, "chunk key must include 'tag'" unless @chunk_key_tag
    placeholder_validate!(:tag, @tag) if @tag
    @geoip = Fluent::GeoIP.new(self, conf)
  end

  def format(tag, time, record)
    record = inject_values_to_record(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def formatted_to_msgpack_binary
    true
  end

  def write(chunk)
    es = Fluent::MultiEventStream.new
    tag = ""
    chunk.each do |_tag, time, record|
      tag = _tag
      es.add(time, @geoip.add_geoip_field(record))
    end
    tag = extract_placeholders(@tag, chunk.metadata) if @tag
    router.emit_stream(tag, es)
  end

  def multi_workers_ready?
    true
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-geoip-1.1.0 lib/fluent/plugin/out_geoip.rb