lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.16 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.17
- old
+ new
@@ -72,21 +72,23 @@
require 'json'
require 'cgi' # CGI.escape
require 'time' # Time#rfc2822
require 'td-client'
require 'digest/md5'
+ require 'stringio'
super
@tmpdir = nil
@apikey = nil
@key = nil
@key_num_limit = 5120 # TODO
@record_size_limit = 32*1024*1024 # TODO
- @table_list = []
+ @table_list = {}
@auto_create_table = true
@use_ssl = false
@buffer_type = 'file' # overwrite default buffer_type
@flush_interval = 300 # overwrite default flush_interval to 5mins
+ @empty_gz_data = create_empty_gz_data
end
def configure(conf)
super
@@ -130,12 +132,10 @@
database = conf['database']
table = conf['table']
if database && table
validate_database_and_table_name(database, table, conf)
@key = "#{database}.#{table}"
- else
- raise ConfigError, "'database' and 'table' parameter are required on tdlog output without auto_create_table" unless @auto_create_table
end
@anonymizes = {}
conf.elements.select {|e|
e.name == 'anonymize'
@@ -157,26 +157,25 @@
@anonymizes[key] = scr
}
@anonymizes = nil if @anonymizes.empty?
@http_proxy = conf['http_proxy']
- @user_agent = "fluent-plugin-td: 0.10.15" # TODO: automatic increment version
+ @user_agent = "fluent-plugin-td: 0.10.17" # TODO: automatic increment version
end
def start
super
@client = TreasureData::Client.new(@apikey, :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent,
:connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout)
- begin
- check_table_exists(@key) if @key
- rescue => e
+
+ if @key
if @auto_create_table
- database, table = @key.split('.', 2)
+ database, table = @key.split('.',2)
ensure_database_and_table(database, table)
else
- raise e
+ check_table_exists(@key)
end
end
end
def emit(tag, es, chain)
@@ -270,10 +269,16 @@
ensure
w.close if w
f.close if f
end
+ def create_empty_gz_data
+ io = StringIO.new
+ Zlib::GzipWriter.new(io).close
+ io.string
+ end
+
def upload(database, table, io, size, unique_id)
unique_str = unique_id.unpack('C*').map {|x| "%02x" % x }.join
$log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
begin
@@ -295,35 +300,26 @@
raise ne
end
end
def check_table_exists(key)
- unless @table_list.include?(key)
+ unless @table_list.has_key?(key)
+ database, table = key.split('.',2)
+ $log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
+ io = StringIO.new(@empty_gz_data)
begin
- @table_list = get_table_list
+ @client.import(database, table, "msgpack.gz", io, io.size)
+ @table_list[key] = true
+ rescue TreasureData::NotFoundError
+ raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it."
rescue
- $log.warn "failed to update table list on Treasure Data", :error=>$!.to_s
+ $log.warn "failed to check table existence on Treasure Data", :error=>$!.to_s
$log.debug_backtrace $!
end
- unless @table_list.include?(key)
- database, table = key.split('.',2)
- raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it."
- end
end
end
- def get_table_list
- $log.info "updating table list from Treasure Data"
- list = []
- @client.databases.each {|db|
- db.tables.each {|tbl|
- list << "#{db.name}.#{tbl.name}"
- }
- }
- list
- end
-
def validate_database_and_table_name(database, table, conf)
begin
TreasureData::API.validate_database_name(database)
rescue => e
raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}"
@@ -340,9 +336,10 @@
begin
@client.create_log_table(database, table)
rescue TreasureData::NotFoundError
@client.create_database(database)
@client.create_log_table(database, table)
+ rescue TreasureData::AlreadyExistsError
end
end
end