lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.0 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.1

- old
+ new

@@ -65,15 +65,19 @@ table = conf['table'] if !database || !table raise ConfigError, "'database' and 'table' parameter are required on tdlog output" end - if !validate_name(database) - raise ConfigError, "Invalid database name #{database.inspect}: #{conf}" + begin + TreasureData::API.validate_database_name(database) + rescue + raise ConfigError, "Invalid database name #{database.inspect}: #{$!}: #{conf}" end - if !validate_name(table) - raise ConfigError, "Invalid table name #{table.inspect}: #{conf}" + begin + TreasureData::API.validate_table_name(table) + rescue + raise ConfigError, "Invalid table name #{table.inspect}: #{$!}: #{conf}" end @key = "#{database}.#{table}" end end @@ -88,28 +92,22 @@ def emit(tag, es, chain) if @key key = @key else database, table = tag.split('.')[-2,2] - if !validate_name(database) || !validate_name(table) - $log.debug { "Invalid tag #{tag.inspect}" } - return - end + TreasureData::API.validate_database_name(database) + TreasureData::API.validate_table_name(table) key = "#{database}.#{table}" end unless @auto_create_table check_table_exists(key) end super(tag, es, chain, key) end - def validate_name(name) - true - end - def format_stream(tag, es) out = '' off = out.bytesize es.each {|time,record| record['time'] = time @@ -130,14 +128,10 @@ out end def write(chunk) database, table = chunk.key.split('.',2) - if !validate_name(database) || !validate_name(table) - $log.error "Invalid key name #{chunk.key.inspect}" - return - end f = Tempfile.new("tdlog-", @tmpdir) w = Zlib::GzipWriter.new(f) chunk.write_to(w) @@ -155,23 +149,31 @@ def upload(database, table, io, size) $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin - @client.import(database, table, "msgpack.gz", io, size) - rescue TreasureData::NotFoundError - unless @auto_create_table - raise $! - end - $log.info "Creating table #{database}.#{table} on TreasureData" begin - @client.create_log_table(database, table) + start = Time.now + @client.import(database, table, "msgpack.gz", io, size) rescue TreasureData::NotFoundError - @client.create_database(database) - @client.create_log_table(database, table) + unless @auto_create_table + raise $! + end + $log.info "Creating table #{database}.#{table} on TreasureData" + begin + @client.create_log_table(database, table) + rescue TreasureData::NotFoundError + @client.create_database(database) + @client.create_log_table(database, table) + end + io.pos = 0 + retry end - io.pos = 0 - retry + rescue => e + elapsed = Time.now - start + ne = RuntimeError.new("Failed to upload to TreasureData: #{$!} (#{size} bytes; #{elapsed} seconds)") + ne.set_backtrace(e.backtrace) + raise ne end end def check_table_exists(key) unless @table_list.include?(key)