Sha256: 8b0ffbac56479d6b636318249c69d9bf455b1f7024ab9a923a7f529b4e46c574
Contents?: true
Size: 1.64 KB
Versions: 1
Compression:
Stored size: 1.64 KB
Contents
class Fluent::GeoipOutput < Fluent::BufferedOutput Fluent::Plugin.register_output('geoip', self) GEOIP_KEYS = %w(city latitude longitude country_code3 country_code country_name dma_code area_code region) config_param :geoip_database, :string, :default => 'data/GeoLiteCity.dat' config_param :geoip_lookup_key, :string, :default => 'host' include Fluent::HandleTagNameMixin include Fluent::SetTagKeyMixin config_set_default :include_tag_key, false def initialize require 'geoip' super end def configure(conf) super @geoip_keys_map = Hash.new conf.keys.select{|k| k =~ /^enable_key_/}.each do |key| geoip_key_name = key.sub('enable_key_','') raise Fluent::ConfigError, "geoip: unsupported key #{geoip_key_name}" unless GEOIP_KEYS.include?(geoip_key_name) @geoip_keys_map.store(geoip_key_name, conf[key]) end if ( !@remove_tag_prefix && !@remove_tag_suffix && !@add_tag_prefix && !@add_tag_suffix ) raise Fluent::ConfigError, "geoip: missing remove_tag_prefix, remove_tag_suffix, add_tag_prefix or add_tag_suffix." end @geoip = GeoIP::City.new(@geoip_database, :memory, false) end def start super end def format(tag, time, record) [tag, time, record].to_msgpack end def shutdown super end def write(chunk) chunk.msgpack_each do |tag, time, record| result = @geoip.look_up(record[@geoip_lookup_key]) @geoip_keys_map.each do |geoip_key,record_key| record.store(record_key, result[geoip_key.to_sym]) end $log.info "geoip: record:#{record}, result:#{result}" Fluent::Engine.emit(tag, time, record) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-geoip-0.0.2 | lib/fluent/plugin/out_geoip.rb |