lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.15 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.16

- old
+ new

@@ -58,10 +58,14 @@ "#{o1}.#{o2}.#{o3}.#{o4}" end end + config_param :connect_timeout, :integer, :default => nil + config_param :read_timeout, :integer, :default => nil + config_param :send_timeout, :integer, :default => nil + def initialize require 'fileutils' require 'tempfile' require 'zlib' require 'net/http' @@ -121,28 +125,17 @@ raise ConfigError, "'true' or 'false' is required for use_ssl option on tdlog output" end end end - unless @auto_create_table - database = conf['database'] - table = conf['table'] - - if !database || !table - raise ConfigError, "'database' and 'table' parameter are required on tdlog output" - end - begin - TreasureData::API.validate_database_name(database) - rescue - raise ConfigError, "Invalid database name #{database.inspect}: #{$!}: #{conf}" - end - begin - TreasureData::API.validate_table_name(table) - rescue - raise ConfigError, "Invalid table name #{table.inspect}: #{$!}: #{conf}" - end + database = conf['database'] + table = conf['table'] + if database && table + validate_database_and_table_name(database, table, conf) @key = "#{database}.#{table}" + else + raise ConfigError, "'database' and 'table' parameter are required on tdlog output without auto_create_table" unless @auto_create_table end @anonymizes = {} conf.elements.select {|e| e.name == 'anonymize' @@ -169,13 +162,22 @@ @user_agent = "fluent-plugin-td: 0.10.15" # TODO: automatic increment version end def start super - @client = TreasureData::Client.new(@apikey, :ssl=>@use_ssl, :http_proxy=>@http_proxy, :user_agent=>@user_agent) - unless @auto_create_table - check_table_exists(@key) + + @client = TreasureData::Client.new(@apikey, :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, + :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout) + begin + check_table_exists(@key) if @key + rescue => e + if @auto_create_table + database, table = @key.split('.', 2) + ensure_database_and_table(database, table) + else + raise e + end end end def emit(tag, es, chain) if @key @@ -280,17 +282,11 @@ @client.import(database, table, "msgpack.gz", io, size, unique_str) rescue TreasureData::NotFoundError unless @auto_create_table raise $! end - $log.info "Creating table #{database}.#{table} on TreasureData" - begin - @client.create_log_table(database, table) - rescue TreasureData::NotFoundError - @client.create_database(database) - @client.create_log_table(database, table) - end + ensure_database_and_table(database, table) io.pos = 0 retry end rescue => e elapsed = Time.now - start @@ -322,9 +318,32 @@ db.tables.each {|tbl| list << "#{db.name}.#{tbl.name}" } } list + end + + def validate_database_and_table_name(database, table, conf) + begin + TreasureData::API.validate_database_name(database) + rescue => e + raise ConfigError, "Invalid database name #{database.inspect}: #{e}: #{conf}" + end + begin + TreasureData::API.validate_table_name(table) + rescue => e + raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}" + end + end + + def ensure_database_and_table(database, table) + $log.info "Creating table #{database}.#{table} on TreasureData" + begin + @client.create_log_table(database, table) + rescue TreasureData::NotFoundError + @client.create_database(database) + @client.create_log_table(database, table) + end end end end