lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.16 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.17

- old
+ new

@@ -72,21 +72,23 @@ require 'json' require 'cgi' # CGI.escape require 'time' # Time#rfc2822 require 'td-client' require 'digest/md5' + require 'stringio' super @tmpdir = nil @apikey = nil @key = nil @key_num_limit = 5120 # TODO @record_size_limit = 32*1024*1024 # TODO - @table_list = [] + @table_list = {} @auto_create_table = true @use_ssl = false @buffer_type = 'file' # overwrite default buffer_type @flush_interval = 300 # overwrite default flush_interval to 5mins + @empty_gz_data = create_empty_gz_data end def configure(conf) super @@ -130,12 +132,10 @@ database = conf['database'] table = conf['table'] if database && table validate_database_and_table_name(database, table, conf) @key = "#{database}.#{table}" - else - raise ConfigError, "'database' and 'table' parameter are required on tdlog output without auto_create_table" unless @auto_create_table end @anonymizes = {} conf.elements.select {|e| e.name == 'anonymize' @@ -157,26 +157,25 @@ @anonymizes[key] = scr } @anonymizes = nil if @anonymizes.empty? @http_proxy = conf['http_proxy'] - @user_agent = "fluent-plugin-td: 0.10.15" # TODO: automatic increment version + @user_agent = "fluent-plugin-td: 0.10.17" # TODO: automatic increment version end def start super @client = TreasureData::Client.new(@apikey, :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout) - begin - check_table_exists(@key) if @key - rescue => e + + if @key if @auto_create_table - database, table = @key.split('.', 2) + database, table = @key.split('.',2) ensure_database_and_table(database, table) else - raise e + check_table_exists(@key) end end end def emit(tag, es, chain) @@ -270,10 +269,16 @@ ensure w.close if w f.close if f end + def create_empty_gz_data + io = StringIO.new + Zlib::GzipWriter.new(io).close + io.string + end + def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map {|x| "%02x" % x }.join $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin @@ -295,35 +300,26 @@ raise ne end end def check_table_exists(key) - unless @table_list.include?(key) + unless @table_list.has_key?(key) + database, table = key.split('.',2) + $log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" + io = StringIO.new(@empty_gz_data) begin - @table_list = get_table_list + @client.import(database, table, "msgpack.gz", io, io.size) + @table_list[key] = true + rescue TreasureData::NotFoundError + raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue - $log.warn "failed to update table list on Treasure Data", :error=>$!.to_s + $log.warn "failed to check table existence on Treasure Data", :error=>$!.to_s $log.debug_backtrace $! end - unless @table_list.include?(key) - database, table = key.split('.',2) - raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td create-log-table #{database} #{table}' to create it." - end end end - def get_table_list - $log.info "updating table list from Treasure Data" - list = [] - @client.databases.each {|db| - db.tables.each {|tbl| - list << "#{db.name}.#{tbl.name}" - } - } - list - end - def validate_database_and_table_name(database, table, conf) begin TreasureData::API.validate_database_name(database) rescue => e raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}" @@ -340,9 +336,10 @@ begin @client.create_log_table(database, table) rescue TreasureData::NotFoundError @client.create_database(database) @client.create_log_table(database, table) + rescue TreasureData::AlreadyExistsError end end end