lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.0 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.1
- old
+ new
@@ -65,15 +65,19 @@
table = conf['table']
if !database || !table
raise ConfigError, "'database' and 'table' parameter are required on tdlog output"
end
- if !validate_name(database)
- raise ConfigError, "Invalid database name #{database.inspect}: #{conf}"
+ begin
+ TreasureData::API.validate_database_name(database)
+ rescue
+ raise ConfigError, "Invalid database name #{database.inspect}: #{$!}: #{conf}"
end
- if !validate_name(table)
- raise ConfigError, "Invalid table name #{table.inspect}: #{conf}"
+ begin
+ TreasureData::API.validate_table_name(table)
+ rescue
+ raise ConfigError, "Invalid table name #{table.inspect}: #{$!}: #{conf}"
end
@key = "#{database}.#{table}"
end
end
@@ -88,28 +92,22 @@
def emit(tag, es, chain)
if @key
key = @key
else
database, table = tag.split('.')[-2,2]
- if !validate_name(database) || !validate_name(table)
- $log.debug { "Invalid tag #{tag.inspect}" }
- return
- end
+ TreasureData::API.validate_database_name(database)
+ TreasureData::API.validate_table_name(table)
key = "#{database}.#{table}"
end
unless @auto_create_table
check_table_exists(key)
end
super(tag, es, chain, key)
end
- def validate_name(name)
- true
- end
-
def format_stream(tag, es)
out = ''
off = out.bytesize
es.each {|time,record|
record['time'] = time
@@ -130,14 +128,10 @@
out
end
def write(chunk)
database, table = chunk.key.split('.',2)
- if !validate_name(database) || !validate_name(table)
- $log.error "Invalid key name #{chunk.key.inspect}"
- return
- end
f = Tempfile.new("tdlog-", @tmpdir)
w = Zlib::GzipWriter.new(f)
chunk.write_to(w)
@@ -155,23 +149,31 @@
def upload(database, table, io, size)
$log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
begin
- @client.import(database, table, "msgpack.gz", io, size)
- rescue TreasureData::NotFoundError
- unless @auto_create_table
- raise $!
- end
- $log.info "Creating table #{database}.#{table} on TreasureData"
begin
- @client.create_log_table(database, table)
+ start = Time.now
+ @client.import(database, table, "msgpack.gz", io, size)
rescue TreasureData::NotFoundError
- @client.create_database(database)
- @client.create_log_table(database, table)
+ unless @auto_create_table
+ raise $!
+ end
+ $log.info "Creating table #{database}.#{table} on TreasureData"
+ begin
+ @client.create_log_table(database, table)
+ rescue TreasureData::NotFoundError
+ @client.create_database(database)
+ @client.create_log_table(database, table)
+ end
+ io.pos = 0
+ retry
end
- io.pos = 0
- retry
+ rescue => e
+ elapsed = Time.now - start
+ ne = RuntimeError.new("Failed to upload to TreasureData: #{$!} (#{size} bytes; #{elapsed} seconds)")
+ ne.set_backtrace(e.backtrace)
+ raise ne
end
end
def check_table_exists(key)
unless @table_list.include?(key)