lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.9.7 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.9.8

- old
+ new

@@ -14,15 +14,19 @@ require 'zlib' require 'net/http' require 'json' require 'cgi' # CGI.escape require 'time' # Time#rfc2822 + require 'td-client' super @tmpdir = '/tmp/fluent/tdlog' @apikey = nil @key = nil + @key_num_limit = 5120 # TODO + @record_size_limit = 32*1024*1024 # TODO @table_list = [] + @auto_create_table = false end def configure(conf) super @@ -46,14 +50,20 @@ @key = "#{database}.#{table}" elsif (database && !table) || (!database && table) raise ConfigError, "'database' and 'table' parameter are required on tdlog output" end - @table_list = get_table_list + if conf['auto_create_table'] + @auto_create_table = true + end + end - if @key && !@table_list.include?(@key) - raise ConfigError, "Table #{@key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it." + def start + super + @client = TreasureData::Client.new(@apikey) + unless @auto_create_table + check_table_exists(@key) end end def emit(tag, es, chain) if @key @@ -65,22 +75,12 @@ return end key = "#{database}.#{table}" end - # check the table exists - unless @table_list.include?(key) - begin - @table_list = get_table_list - rescue - $log.warn "failed to update table list on Treasure Data", :error=>$!.to_s - $log.debug_backtrace $! - end - unless @table_list.include?(key) - database, table = key.split('.',2) - raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it." - end + unless @auto_create_table + check_table_exists(key) end super(tag, es, chain, key) end @@ -88,14 +88,27 @@ true end def format_stream(tag, es) out = '' + off = out.bytesize es.each {|event| record = event.record record['time'] = event.time + + if record.size > @key_num_limit + raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record + end + record.to_msgpack(out) + + noff = out.bytesize + sz = noff - off + if sz > @record_size_limit + raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record + end + off = noff } out end def write(chunk) @@ -120,89 +133,53 @@ w.close if w f.close if f end def upload(database, table, io, size) - http, header = new_http - header['Content-Length'] = size.to_s - header['Content-Type'] = 'application/octet-stream' + $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } - url = "/v3/table/import/#{e database}/#{e table}/msgpack.gz" - - req = Net::HTTP::Put.new(url, header) - if req.respond_to?(:body_stream=) - req.body_stream = io - else # Ruby 1.8 - req.body = io.read + begin + @client.import(database, table, "msgpack.gz", io, size) + rescue TreasureData::NotFoundError + unless @auto_create_table + raise $! + end + $log.info "Creating table #{database}.#{table} on TreasureData" + begin + @client.create_log_table(database, table) + rescue TreasureData::NotFoundError + @client.create_database(database) + @client.create_log_table(database, table) + end + io.pos = 0 + retry end + end - $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } - - response = http.request(req) - - if response.code[0] != ?2 - raise "Treasure Data upload failed: #{response.body}" + def check_table_exists(key) + unless @table_list.include?(key) + begin + @table_list = get_table_list + rescue + $log.warn "failed to update table list on Treasure Data", :error=>$!.to_s + $log.debug_backtrace $! + end + unless @table_list.include?(key) + database, table = key.split('.',2) + raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it." + end end end def get_table_list $log.info "updating table list from Treasure Data" list = [] - api_list_database.each {|db| - api_list_table(db).each {|t| - list << "#{db}.#{t}" + @client.databases.each {|db| + db.tables.each {|tbl| + list << "#{db.name}.#{tbl.name}" } } list - end - - def api_list_database - body = get("/v3/database/list") - js = JSON.load(body) - return js["databases"].map {|m| m['name'] } - end - - def api_list_table(db) - body = get("/v3/table/list/#{e db}") - js = JSON.load(body) - return js["tables"].map {|m| m['name'] } - end - - def get(path) - http, header = new_http - - request = Net::HTTP::Get.new(path, header) - - response = http.request(request) - - if response.code[0] != ?2 - raise "Treasure Data API failed: #{response.body}" - end - - return response.body - end - - def new_http - http = Net::HTTP.new(HOST, PORT) - if USE_SSL - http.use_ssl = true - http.verify_mode = OpenSSL::SSL::VERIFY_PEER - store = OpenSSL::X509::Store.new - http.cert_store = store - end - - # TODO read_timeout - #http.read_timeout = options[:read_timeout] - - header = {} - header['Authorization'] = "TD1 #{@apikey}" - header['Date'] = Time.now.rfc2822 - - return http, header - end - - def e(s) - CGI.escape(s.to_s) end end end