lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.9 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.10
- old
+ new
@@ -4,19 +4,74 @@
class TreasureDataLogOutput < BufferedOutput
Plugin.register_output('tdlog', self)
IMPORT_SIZE_LIMIT = 32*1024*1024
+ class Anonymizer
+ include Configurable
+ end
+
+ class RawAnonymizer < Anonymizer
+ def anonymize(obj)
+ if obj == nil
+ nil
+ elsif obj.is_a?(String)
+ anonymize_raw obj
+ elsif obj.is_a?(Numeric)
+ anonymize_raw obj.to_s
+ else
+ # boolean, array, map
+ anonymize_raw MessagePack.pack(obj)
+ end
+ end
+ end
+
+ class MD5Anonymizer < RawAnonymizer
+ def anonymize_raw(raw)
+ Digest::MD5.hexdigest(raw)
+ end
+ end
+
+ class IPXORAnonymizer < RawAnonymizer
+ config_param :xor_key, :string
+
+ def configure(conf)
+ super
+
+ a1, a2, a3, a4 = @xor_key.split('.')
+ @xor_keys = [a1.to_i, a2.to_i, a3.to_i, a4.to_i]
+
+ if @xor_keys == [0, 0, 0, 0]
+ raise ConfigError, "'xor_key' must be IPv4 address"
+ end
+ end
+
+ def anonymize_raw(raw)
+ m = /\A(\d+)\.(\d+)\.(\d+)\.(\d+)/.match(raw)
+ return nil unless m
+
+ k1, k2, k3, k4 = @xor_keys
+
+ o1 = m[1].to_i ^ k1
+ o2 = m[2].to_i ^ k2
+ o3 = m[3].to_i ^ k3
+ o4 = m[4].to_i ^ k4
+
+ "#{o1}.#{o2}.#{o3}.#{o4}"
+ end
+ end
+
def initialize
require 'fileutils'
require 'tempfile'
require 'zlib'
require 'net/http'
require 'json'
require 'cgi' # CGI.escape
require 'time' # Time#rfc2822
require 'td-client'
+ require 'digest/md5'
super
@tmpdir = '/tmp/fluent/tdlog'
@apikey = nil
@key = nil
@key_num_limit = 5120 # TODO
@@ -88,10 +143,32 @@
raise ConfigError, "Invalid table name #{table.inspect}: #{$!}: #{conf}"
end
@key = "#{database}.#{table}"
end
+ @anonymizes = {}
+ conf.elements.select {|e|
+ e.name == 'anonymize'
+ }.each {|e|
+ key = e['key']
+ method = e['method']
+
+ case method
+ when 'md5'
+ scr = MD5Anonymizer.new
+ when 'ip_xor'
+ scr = IPXORAnonymizer.new
+ else
+ raise ConfigError, "Unknown anonymize method: #{method}"
+ end
+
+ scr.configure(e)
+
+ @anonymizes[key] = scr
+ }
+ @anonymizes = nil if @anonymizes.empty?
+
@http_proxy = conf['http_proxy']
end
def start
super
@@ -121,9 +198,17 @@
def format_stream(tag, es)
out = ''
off = out.bytesize
es.each {|time,record|
begin
+ if @anonymizes
+ @anonymizes.each_pair {|key,scr|
+ if value = record[key]
+ record[key] = scr.anonymize(value)
+ end
+ }
+ end
+
record['time'] = time
if record.size > @key_num_limit
raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record
end