lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.9 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.10

- old
+ new

@@ -4,19 +4,74 @@ class TreasureDataLogOutput < BufferedOutput Plugin.register_output('tdlog', self) IMPORT_SIZE_LIMIT = 32*1024*1024 + class Anonymizer + include Configurable + end + + class RawAnonymizer < Anonymizer + def anonymize(obj) + if obj == nil + nil + elsif obj.is_a?(String) + anonymize_raw obj + elsif obj.is_a?(Numeric) + anonymize_raw obj.to_s + else + # boolean, array, map + anonymize_raw MessagePack.pack(obj) + end + end + end + + class MD5Anonymizer < RawAnonymizer + def anonymize_raw(raw) + Digest::MD5.hexdigest(raw) + end + end + + class IPXORAnonymizer < RawAnonymizer + config_param :xor_key, :string + + def configure(conf) + super + + a1, a2, a3, a4 = @xor_key.split('.') + @xor_keys = [a1.to_i, a2.to_i, a3.to_i, a4.to_i] + + if @xor_keys == [0, 0, 0, 0] + raise ConfigError, "'xor_key' must be IPv4 address" + end + end + + def anonymize_raw(raw) + m = /\A(\d+)\.(\d+)\.(\d+)\.(\d+)/.match(raw) + return nil unless m + + k1, k2, k3, k4 = @xor_keys + + o1 = m[1].to_i ^ k1 + o2 = m[2].to_i ^ k2 + o3 = m[3].to_i ^ k3 + o4 = m[4].to_i ^ k4 + + "#{o1}.#{o2}.#{o3}.#{o4}" + end + end + def initialize require 'fileutils' require 'tempfile' require 'zlib' require 'net/http' require 'json' require 'cgi' # CGI.escape require 'time' # Time#rfc2822 require 'td-client' + require 'digest/md5' super @tmpdir = '/tmp/fluent/tdlog' @apikey = nil @key = nil @key_num_limit = 5120 # TODO @@ -88,10 +143,32 @@ raise ConfigError, "Invalid table name #{table.inspect}: #{$!}: #{conf}" end @key = "#{database}.#{table}" end + @anonymizes = {} + conf.elements.select {|e| + e.name == 'anonymize' + }.each {|e| + key = e['key'] + method = e['method'] + + case method + when 'md5' + scr = MD5Anonymizer.new + when 'ip_xor' + scr = IPXORAnonymizer.new + else + raise ConfigError, "Unknown anonymize method: #{method}" + end + + scr.configure(e) + + @anonymizes[key] = scr + } + @anonymizes = nil if @anonymizes.empty? + @http_proxy = conf['http_proxy'] end def start super @@ -121,9 +198,17 @@ def format_stream(tag, es) out = '' off = out.bytesize es.each {|time,record| begin + if @anonymizes + @anonymizes.each_pair {|key,scr| + if value = record[key] + record[key] = scr.anonymize(value) + end + } + end + record['time'] = time if record.size > @key_num_limit raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record end