lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.15 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.16
- old
+ new
@@ -58,10 +58,14 @@
"#{o1}.#{o2}.#{o3}.#{o4}"
end
end
+ config_param :connect_timeout, :integer, :default => nil
+ config_param :read_timeout, :integer, :default => nil
+ config_param :send_timeout, :integer, :default => nil
+
def initialize
require 'fileutils'
require 'tempfile'
require 'zlib'
require 'net/http'
@@ -121,28 +125,17 @@
raise ConfigError, "'true' or 'false' is required for use_ssl option on tdlog output"
end
end
end
- unless @auto_create_table
- database = conf['database']
- table = conf['table']
-
- if !database || !table
- raise ConfigError, "'database' and 'table' parameter are required on tdlog output"
- end
- begin
- TreasureData::API.validate_database_name(database)
- rescue
- raise ConfigError, "Invalid database name #{database.inspect}: #{$!}: #{conf}"
- end
- begin
- TreasureData::API.validate_table_name(table)
- rescue
- raise ConfigError, "Invalid table name #{table.inspect}: #{$!}: #{conf}"
- end
+ database = conf['database']
+ table = conf['table']
+ if database && table
+ validate_database_and_table_name(database, table, conf)
@key = "#{database}.#{table}"
+ else
+ raise ConfigError, "'database' and 'table' parameter are required on tdlog output without auto_create_table" unless @auto_create_table
end
@anonymizes = {}
conf.elements.select {|e|
e.name == 'anonymize'
@@ -169,13 +162,22 @@
@user_agent = "fluent-plugin-td: 0.10.15" # TODO: automatic increment version
end
def start
super
- @client = TreasureData::Client.new(@apikey, :ssl=>@use_ssl, :http_proxy=>@http_proxy, :user_agent=>@user_agent)
- unless @auto_create_table
- check_table_exists(@key)
+
+ @client = TreasureData::Client.new(@apikey, :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent,
+ :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout)
+ begin
+ check_table_exists(@key) if @key
+ rescue => e
+ if @auto_create_table
+ database, table = @key.split('.', 2)
+ ensure_database_and_table(database, table)
+ else
+ raise e
+ end
end
end
def emit(tag, es, chain)
if @key
@@ -280,17 +282,11 @@
@client.import(database, table, "msgpack.gz", io, size, unique_str)
rescue TreasureData::NotFoundError
unless @auto_create_table
raise $!
end
- $log.info "Creating table #{database}.#{table} on TreasureData"
- begin
- @client.create_log_table(database, table)
- rescue TreasureData::NotFoundError
- @client.create_database(database)
- @client.create_log_table(database, table)
- end
+ ensure_database_and_table(database, table)
io.pos = 0
retry
end
rescue => e
elapsed = Time.now - start
@@ -322,9 +318,32 @@
db.tables.each {|tbl|
list << "#{db.name}.#{tbl.name}"
}
}
list
+ end
+
+ def validate_database_and_table_name(database, table, conf)
+ begin
+ TreasureData::API.validate_database_name(database)
+ rescue => e
+ raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}"
+ end
+ begin
+ TreasureData::API.validate_table_name(table)
+ rescue => e
+ raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}"
+ end
+ end
+
+ def ensure_database_and_table(database, table)
+ $log.info "Creating table #{database}.#{table} on TreasureData"
+ begin
+ @client.create_log_table(database, table)
+ rescue TreasureData::NotFoundError
+ @client.create_database(database)
+ @client.create_log_table(database, table)
+ end
end
end
end