lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.9.7 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.9.8
- old
+ new
@@ -14,15 +14,19 @@
require 'zlib'
require 'net/http'
require 'json'
require 'cgi' # CGI.escape
require 'time' # Time#rfc2822
+ require 'td-client'
super
@tmpdir = '/tmp/fluent/tdlog'
@apikey = nil
@key = nil
+ @key_num_limit = 5120 # TODO
+ @record_size_limit = 32*1024*1024 # TODO
@table_list = []
+ @auto_create_table = false
end
def configure(conf)
super
@@ -46,14 +50,20 @@
@key = "#{database}.#{table}"
elsif (database && !table) || (!database && table)
raise ConfigError, "'database' and 'table' parameter are required on tdlog output"
end
- @table_list = get_table_list
+ if conf['auto_create_table']
+ @auto_create_table = true
+ end
+ end
- if @key && !@table_list.include?(@key)
- raise ConfigError, "Table #{@key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it."
+ def start
+ super
+ @client = TreasureData::Client.new(@apikey)
+ unless @auto_create_table
+ check_table_exists(@key)
end
end
def emit(tag, es, chain)
if @key
@@ -65,22 +75,12 @@
return
end
key = "#{database}.#{table}"
end
- # check the table exists
- unless @table_list.include?(key)
- begin
- @table_list = get_table_list
- rescue
- $log.warn "failed to update table list on Treasure Data", :error=>$!.to_s
- $log.debug_backtrace $!
- end
- unless @table_list.include?(key)
- database, table = key.split('.',2)
- raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it."
- end
+ unless @auto_create_table
+ check_table_exists(key)
end
super(tag, es, chain, key)
end
@@ -88,14 +88,27 @@
true
end
def format_stream(tag, es)
out = ''
+ off = out.bytesize
es.each {|event|
record = event.record
record['time'] = event.time
+
+ if record.size > @key_num_limit
+ raise "Too many number of keys (#{record.size} keys)" # TODO include summary of the record
+ end
+
record.to_msgpack(out)
+
+ noff = out.bytesize
+ sz = noff - off
+ if sz > @record_size_limit
+ raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record
+ end
+ off = noff
}
out
end
def write(chunk)
@@ -120,89 +133,53 @@
w.close if w
f.close if f
end
def upload(database, table, io, size)
- http, header = new_http
- header['Content-Length'] = size.to_s
- header['Content-Type'] = 'application/octet-stream'
+ $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
- url = "/v3/table/import/#{e database}/#{e table}/msgpack.gz"
-
- req = Net::HTTP::Put.new(url, header)
- if req.respond_to?(:body_stream=)
- req.body_stream = io
- else # Ruby 1.8
- req.body = io.read
+ begin
+ @client.import(database, table, "msgpack.gz", io, size)
+ rescue TreasureData::NotFoundError
+ unless @auto_create_table
+ raise $!
+ end
+ $log.info "Creating table #{database}.#{table} on TreasureData"
+ begin
+ @client.create_log_table(database, table)
+ rescue TreasureData::NotFoundError
+ @client.create_database(database)
+ @client.create_log_table(database, table)
+ end
+ io.pos = 0
+ retry
end
+ end
- $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
-
- response = http.request(req)
-
- if response.code[0] != ?2
- raise "Treasure Data upload failed: #{response.body}"
+ def check_table_exists(key)
+ unless @table_list.include?(key)
+ begin
+ @table_list = get_table_list
+ rescue
+ $log.warn "failed to update table list on Treasure Data", :error=>$!.to_s
+ $log.debug_backtrace $!
+ end
+ unless @table_list.include?(key)
+ database, table = key.split('.',2)
+ raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it."
+ end
end
end
def get_table_list
$log.info "updating table list from Treasure Data"
list = []
- api_list_database.each {|db|
- api_list_table(db).each {|t|
- list << "#{db}.#{t}"
+ @client.databases.each {|db|
+ db.tables.each {|tbl|
+ list << "#{db.name}.#{tbl.name}"
}
}
list
- end
-
- def api_list_database
- body = get("/v3/database/list")
- js = JSON.load(body)
- return js["databases"].map {|m| m['name'] }
- end
-
- def api_list_table(db)
- body = get("/v3/table/list/#{e db}")
- js = JSON.load(body)
- return js["tables"].map {|m| m['name'] }
- end
-
- def get(path)
- http, header = new_http
-
- request = Net::HTTP::Get.new(path, header)
-
- response = http.request(request)
-
- if response.code[0] != ?2
- raise "Treasure Data API failed: #{response.body}"
- end
-
- return response.body
- end
-
- def new_http
- http = Net::HTTP.new(HOST, PORT)
- if USE_SSL
- http.use_ssl = true
- http.verify_mode = OpenSSL::SSL::VERIFY_PEER
- store = OpenSSL::X509::Store.new
- http.cert_store = store
- end
-
- # TODO read_timeout
- #http.read_timeout = options[:read_timeout]
-
- header = {}
- header['Authorization'] = "TD1 #{@apikey}"
- header['Date'] = Time.now.rfc2822
-
- return http, header
- end
-
- def e(s)
- CGI.escape(s.to_s)
end
end
end