lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.20 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.21
- old
+ new
@@ -1,358 +1,347 @@
require 'td-client'
module Fluent
+ class TreasureDataLogOutput < BufferedOutput
+ Plugin.register_output('tdlog', self)
+ IMPORT_SIZE_LIMIT = 32 * 1024 * 1024
-class TreasureDataLogOutput < BufferedOutput
- Plugin.register_output('tdlog', self)
+ class Anonymizer
+ include Configurable
+ end
- IMPORT_SIZE_LIMIT = 32*1024*1024
-
- class Anonymizer
- include Configurable
- end
-
- class RawAnonymizer < Anonymizer
- def anonymize(obj)
- if obj == nil
- nil
- elsif obj.is_a?(String)
- anonymize_raw obj
- elsif obj.is_a?(Numeric)
- anonymize_raw obj.to_s
- else
- # boolean, array, map
- anonymize_raw MessagePack.pack(obj)
+ class RawAnonymizer < Anonymizer
+ def anonymize(obj)
+ if obj.nil?
+ nil
+ elsif obj.is_a?(String)
+ anonymize_raw obj
+ elsif obj.is_a?(Numeric)
+ anonymize_raw obj.to_s
+ else
+ # boolean, array, map
+ anonymize_raw MessagePack.pack(obj)
+ end
end
end
- end
- class MD5Anonymizer < RawAnonymizer
- def anonymize_raw(raw)
- Digest::MD5.hexdigest(raw)
+ class MD5Anonymizer < RawAnonymizer
+ def anonymize_raw(raw)
+ Digest::MD5.hexdigest(raw)
+ end
end
- end
- class IPXORAnonymizer < RawAnonymizer
- config_param :xor_key, :string
+ class IPXORAnonymizer < RawAnonymizer
+ config_param :xor_key, :string
- def configure(conf)
- super
+ def configure(conf)
+ super
- a1, a2, a3, a4 = @xor_key.split('.')
- @xor_keys = [a1.to_i, a2.to_i, a3.to_i, a4.to_i]
+ a1, a2, a3, a4 = @xor_key.split('.')
+ @xor_keys = [a1.to_i, a2.to_i, a3.to_i, a4.to_i]
- if @xor_keys == [0, 0, 0, 0]
- raise ConfigError, "'xor_key' must be IPv4 address"
+ if @xor_keys == [0, 0, 0, 0]
+ raise ConfigError, "'xor_key' must be IPv4 address"
+ end
end
- end
- def anonymize_raw(raw)
- m = /\A(\d+)\.(\d+)\.(\d+)\.(\d+)/.match(raw)
- return nil unless m
+ def anonymize_raw(raw)
+ m = /\A(\d+)\.(\d+)\.(\d+)\.(\d+)/.match(raw)
+ return nil unless m
- k1, k2, k3, k4 = @xor_keys
+ k1, k2, k3, k4 = @xor_keys
- o1 = m[1].to_i ^ k1
- o2 = m[2].to_i ^ k2
- o3 = m[3].to_i ^ k3
- o4 = m[4].to_i ^ k4
+ o1 = m[1].to_i ^ k1
+ o2 = m[2].to_i ^ k2
+ o3 = m[3].to_i ^ k3
+ o4 = m[4].to_i ^ k4
- "#{o1}.#{o2}.#{o3}.#{o4}"
+ "#{o1}.#{o2}.#{o3}.#{o4}"
+ end
end
- end
- # To support log_level option since Fluentd v0.10.43
- unless method_defined?(:log)
- define_method(:log) { $log }
- end
+ # To support log_level option since Fluentd v0.10.43
+ unless method_defined?(:log)
+ define_method(:log) { $log }
+ end
- config_param :endpoint, :string, :default => TreasureData::API::NEW_DEFAULT_ENDPOINT
+ config_param :endpoint, :string, :default => TreasureData::API::NEW_DEFAULT_ENDPOINT
- config_param :connect_timeout, :integer, :default => nil
- config_param :read_timeout, :integer, :default => nil
- config_param :send_timeout, :integer, :default => nil
- config_set_default :buffer_type, 'file'
- config_set_default :flush_interval, 300
+ config_param :connect_timeout, :integer, :default => nil
+ config_param :read_timeout, :integer, :default => nil
+ config_param :send_timeout, :integer, :default => nil
+ config_set_default :buffer_type, 'file'
+ config_set_default :flush_interval, 300
- def initialize
- require 'fileutils'
- require 'tempfile'
- require 'zlib'
- require 'net/http'
- require 'json'
- require 'cgi' # CGI.escape
- require 'time' # Time#rfc2822
- require 'digest/md5'
- require 'stringio'
- super
- @tmpdir = nil
- @apikey = nil
- @key = nil
- @key_num_limit = 5120 # TODO
- @record_size_limit = 32*1024*1024 # TODO
- @table_list = {}
- @auto_create_table = true
- @use_ssl = true
- @empty_gz_data = create_empty_gz_data
- end
+ def initialize
+ require 'fileutils'
+ require 'tempfile'
+ require 'zlib'
+ require 'net/http'
+ require 'json'
+ require 'cgi' # CGI.escape
+ require 'time' # Time#rfc2822
+ require 'digest/md5'
+ require 'stringio'
+ super
+ @tmpdir = nil
+ @apikey = nil
+ @key = nil
+ @key_num_limit = 512 # TODO: Our one-time import has the restriction about the number of record keys.
+ @record_size_limit = 32 * 1024 * 1024 # TODO
+ @table_list = {}
+ @auto_create_table = true
+ @use_ssl = true
+ @empty_gz_data = TreasureData::API.create_empty_gz_data
+ end
- def configure(conf)
- super
+ def configure(conf)
+ super
- # overwrite default value of buffer_chunk_limit
- if @buffer.respond_to?(:buffer_chunk_limit=) && !conf['buffer_chunk_limit']
- @buffer.buffer_chunk_limit = IMPORT_SIZE_LIMIT
- end
+ # overwrite default value of buffer_chunk_limit
+ if @buffer.respond_to?(:buffer_chunk_limit=) && !conf['buffer_chunk_limit']
+ @buffer.buffer_chunk_limit = IMPORT_SIZE_LIMIT
+ end
- if conf.has_key?('tmpdir')
- @tmpdir = conf['tmpdir']
- FileUtils.mkdir_p(@tmpdir)
- end
+ if conf.has_key?('tmpdir')
+ @tmpdir = conf['tmpdir']
+ FileUtils.mkdir_p(@tmpdir)
+ end
- @apikey = conf['apikey']
- unless @apikey
- raise ConfigError, "'apikey' parameter is required on tdlog output"
- end
+ @apikey = conf['apikey']
+ unless @apikey
+ raise ConfigError, "'apikey' parameter is required on tdlog output"
+ end
- if auto_create_table = conf['auto_create_table']
- if auto_create_table.empty?
- @auto_create_table = true
- else
- @auto_create_table = Config.bool_value(auto_create_table)
- if @auto_create_table == nil
- raise ConfigError, "'true' or 'false' is required for auto_create_table option on tdlog output"
+ if auto_create_table = conf['auto_create_table']
+ if auto_create_table.empty?
+ @auto_create_table = true
+ else
+ @auto_create_table = Config.bool_value(auto_create_table)
+ if @auto_create_table == nil
+ raise ConfigError, "'true' or 'false' is required for auto_create_table option on tdlog output"
+ end
end
end
- end
- if use_ssl = conf['use_ssl']
- if use_ssl.empty?
- @use_ssl = true
- else
- @use_ssl = Config.bool_value(use_ssl)
- if @use_ssl == nil
- raise ConfigError, "'true' or 'false' is required for use_ssl option on tdlog output"
+ if use_ssl = conf['use_ssl']
+ if use_ssl.empty?
+ @use_ssl = true
+ else
+ @use_ssl = Config.bool_value(use_ssl)
+ if @use_ssl == nil
+ raise ConfigError, "'true' or 'false' is required for use_ssl option on tdlog output"
+ end
end
end
- end
- database = conf['database']
- table = conf['table']
- if database && table
- validate_database_and_table_name(database, table, conf)
- @key = "#{database}.#{table}"
- end
+ database = conf['database']
+ table = conf['table']
+ if database && table
+ validate_database_and_table_name(database, table, conf)
+ @key = "#{database}.#{table}"
+ end
- @anonymizes = {}
- conf.elements.select {|e|
- e.name == 'anonymize'
- }.each {|e|
- key = e['key']
- method = e['method']
+ @anonymizes = {}
+ conf.elements.select { |e|
+ e.name == 'anonymize'
+ }.each { |e|
+ key = e['key']
+ method = e['method']
- case method
- when 'md5'
- scr = MD5Anonymizer.new
- when 'ip_xor'
- scr = IPXORAnonymizer.new
- else
- raise ConfigError, "Unknown anonymize method: #{method}"
- end
+ case method
+ when 'md5'
+ scr = MD5Anonymizer.new
+ when 'ip_xor'
+ scr = IPXORAnonymizer.new
+ else
+ raise ConfigError, "Unknown anonymize method: #{method}"
+ end
- scr.configure(e)
+ scr.configure(e)
- @anonymizes[key] = scr
- }
- @anonymizes = nil if @anonymizes.empty?
+ @anonymizes[key] = scr
+ }
+ @anonymizes = nil if @anonymizes.empty?
- @http_proxy = conf['http_proxy']
- @user_agent = "fluent-plugin-td: 0.10.20" # TODO: automatic increment version
- end
+ @http_proxy = conf['http_proxy']
+ @user_agent = "fluent-plugin-td: 0.10.21" # TODO: automatic increment version
+ end
- def start
- super
+ def start
+ super
- client_opts = {
- :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint,
- :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout
- }
- @client = TreasureData::Client.new(@apikey, client_opts)
+ client_opts = {
+ :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint,
+ :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout
+ }
+ @client = TreasureData::Client.new(@apikey, client_opts)
- if @key
- if @auto_create_table
- database, table = @key.split('.',2)
- ensure_database_and_table(database, table)
- else
- check_table_exists(@key)
+ if @key
+ if @auto_create_table
+ database, table = @key.split('.',2)
+ ensure_database_and_table(database, table)
+ else
+ check_table_exists(@key)
+ end
end
end
- end
- def emit(tag, es, chain)
- if @key
- key = @key
- else
- database, table = tag.split('.')[-2,2]
- database = TreasureData::API.normalize_database_name(database)
- table = TreasureData::API.normalize_table_name(table)
- key = "#{database}.#{table}"
- end
+ def emit(tag, es, chain)
+ if @key
+ key = @key
+ else
+ database, table = tag.split('.')[-2,2]
+ database = TreasureData::API.normalize_database_name(database)
+ table = TreasureData::API.normalize_table_name(table)
+ key = "#{database}.#{table}"
+ end
- unless @auto_create_table
- check_table_exists(key)
+ unless @auto_create_table
+ check_table_exists(key)
+ end
+
+ super(tag, es, chain, key)
end
- super(tag, es, chain, key)
- end
+ def format_stream(tag, es)
+ out = ''
+ off = out.bytesize
+ es.each { |time, record|
+ begin
+ if @anonymizes
+ @anonymizes.each_pair { |key, scr|
+ if value = record[key]
+ record[key] = scr.anonymize(value)
+ end
+ }
+ end
- def format_stream(tag, es)
- out = ''
- off = out.bytesize
- es.each {|time,record|
- begin
- if @anonymizes
- @anonymizes.each_pair {|key,scr|
- if value = record[key]
- record[key] = scr.anonymize(value)
- end
- }
+ record['time'] = time
+
+ if record.size > @key_num_limit
+ raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record
+ end
+ rescue => e
+ # TODO (a) Remove the transaction mechanism of fluentd
+ # or (b) keep transaction boundaries in in/out_forward.
+ # This code disables the transaction mechanism (a).
+ log.error "#{e}: #{summarize_record(record)}"
+ log.error_backtrace e.backtrace
+ next
end
- record['time'] = time
+ begin
+ record.to_msgpack(out)
+ rescue RangeError
+ TreasureData::API.normalized_msgpack(record, out)
+ end
- if record.size > @key_num_limit
- raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record
+ noff = out.bytesize
+ sz = noff - off
+ if sz > @record_size_limit
+ # TODO don't raise error
+ #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record
+ log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
end
+ off = noff
+ }
+ out
+ end
- rescue
- # TODO (a) Remove the transaction mechanism of fluentd
- # or (b) keep transaction boundaries in in/out_forward.
- # This code disables the transaction mechanism (a).
- log.error "#{$!}: #{summarize_record(record)}"
- log.error_backtrace $!.backtrace
- next
+ def summarize_record(record)
+ json = Yajl.dump(record)
+ if json.size > 100
+ json[0..97] + "..."
+ else
+ json
end
-
- begin
- record.to_msgpack(out)
- rescue RangeError
- TreasureData::API.normalized_msgpack(record, out)
- end
-
- noff = out.bytesize
- sz = noff - off
- if sz > @record_size_limit
- # TODO don't raise error
- #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record
- log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
- end
- off = noff
- }
- out
- end
-
- def summarize_record(record)
- json = record.to_json
- if json.size > 100
- json[0..97]+"..."
- else
- json
end
- end
- def write(chunk)
- unique_id = chunk.unique_id
- database, table = chunk.key.split('.',2)
+ def write(chunk)
+ unique_id = chunk.unique_id
+ database, table = chunk.key.split('.', 2)
- FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
- f = Tempfile.new("tdlog-", @tmpdir)
- w = Zlib::GzipWriter.new(f)
+ FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
+ f = Tempfile.new("tdlog-", @tmpdir)
+ w = Zlib::GzipWriter.new(f)
- chunk.write_to(w)
- w.finish
- w = nil
+ chunk.write_to(w)
+ w.finish
+ w = nil
- size = f.pos
- f.pos = 0
- upload(database, table, f, size, unique_id)
+ size = f.pos
+ f.pos = 0
+ upload(database, table, f, size, unique_id)
- ensure
- w.close if w
- f.close if f
- end
+ ensure
+ w.close if w
+ f.close if f
+ end
- def create_empty_gz_data
- io = StringIO.new
- Zlib::GzipWriter.new(io).close
- io.string
- end
+ def upload(database, table, io, size, unique_id)
+ unique_str = unique_id.unpack('C*').map { |x| "%02x" % x }.join
+ log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
- def upload(database, table, io, size, unique_id)
- unique_str = unique_id.unpack('C*').map {|x| "%02x" % x }.join
- log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
-
- begin
begin
- start = Time.now
- @client.import(database, table, "msgpack.gz", io, size, unique_str)
- rescue TreasureData::NotFoundError
- unless @auto_create_table
- raise $!
+ begin
+ start = Time.now
+ @client.import(database, table, "msgpack.gz", io, size, unique_str)
+ rescue TreasureData::NotFoundError => e
+ unless @auto_create_table
+ raise e
+ end
+ ensure_database_and_table(database, table)
+ io.pos = 0
+ retry
end
- ensure_database_and_table(database, table)
- io.pos = 0
- retry
+ rescue => e
+ elapsed = Time.now - start
+ ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{$!} (#{size} bytes; #{elapsed} seconds)")
+ ne.set_backtrace(e.backtrace)
+ raise ne
end
- rescue => e
- elapsed = Time.now - start
- ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{$!} (#{size} bytes; #{elapsed} seconds)")
- ne.set_backtrace(e.backtrace)
- raise ne
end
- end
- def check_table_exists(key)
- unless @table_list.has_key?(key)
- database, table = key.split('.',2)
- log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
- io = StringIO.new(@empty_gz_data)
- begin
- @client.import(database, table, "msgpack.gz", io, io.size)
- @table_list[key] = true
- rescue TreasureData::NotFoundError
- raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it."
- rescue
- log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error=>$!.to_s
- log.debug_backtrace $!
+ def check_table_exists(key)
+ unless @table_list.has_key?(key)
+ database, table = key.split('.', 2)
+ log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
+ io = StringIO.new(@empty_gz_data)
+ begin
+ @client.import(database, table, "msgpack.gz", io, io.size)
+ @table_list[key] = true
+ rescue TreasureData::NotFoundError
+ raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it."
+ rescue => e
+ log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error => e.to_s
+ log.debug_backtrace e.backtrace
+ end
end
end
- end
- def validate_database_and_table_name(database, table, conf)
- begin
- TreasureData::API.validate_database_name(database)
- rescue => e
- raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}"
+ def validate_database_and_table_name(database, table, conf)
+ begin
+ TreasureData::API.validate_database_name(database)
+ rescue => e
+ raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}"
+ end
+ begin
+ TreasureData::API.validate_table_name(table)
+ rescue => e
+ raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}"
+ end
end
- begin
- TreasureData::API.validate_table_name(table)
- rescue => e
- raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}"
- end
- end
- def ensure_database_and_table(database, table)
- log.info "Creating table #{database}.#{table} on TreasureData"
- begin
- @client.create_log_table(database, table)
- rescue TreasureData::NotFoundError
- @client.create_database(database)
- @client.create_log_table(database, table)
- rescue TreasureData::AlreadyExistsError
+ def ensure_database_and_table(database, table)
+ log.info "Creating table #{database}.#{table} on TreasureData"
+ begin
+ @client.create_log_table(database, table)
+ rescue TreasureData::NotFoundError
+ @client.create_database(database)
+ @client.create_log_table(database, table)
+ rescue TreasureData::AlreadyExistsError
+ end
end
end
-end
-
-
end