lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.17 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.18

- old
+ new

@@ -58,10 +58,17 @@ "#{o1}.#{o2}.#{o3}.#{o4}" end end + # To support log_level option since Fluentd v0.10.43 + unless method_defined?(:log) + define_method(:log) { $log } + end + + config_param :endpoint, :string, :default => nil + config_param :connect_timeout, :integer, :default => nil config_param :read_timeout, :integer, :default => nil config_param :send_timeout, :integer, :default => nil def initialize @@ -81,11 +88,11 @@ @key = nil @key_num_limit = 5120 # TODO @record_size_limit = 32*1024*1024 # TODO @table_list = {} @auto_create_table = true - @use_ssl = false + @use_ssl = true @buffer_type = 'file' # overwrite default buffer_type @flush_interval = 300 # overwrite default flush_interval to 5mins @empty_gz_data = create_empty_gz_data end @@ -157,18 +164,26 @@ @anonymizes[key] = scr } @anonymizes = nil if @anonymizes.empty? @http_proxy = conf['http_proxy'] - @user_agent = "fluent-plugin-td: 0.10.17" # TODO: automatic increment version + @user_agent = "fluent-plugin-td: 0.10.18" # TODO: automatic increment version + + if @endpoint.nil? + $log.warn "tdlog plugin will change the API endpoint from api.treasure-data.com to api.treasuredata.com" + $log.warn "If want to keep api.treasure-data.com, please set 'endpoint api.treasure-data.com' in tdlog configuration" + end end def start super - @client = TreasureData::Client.new(@apikey, :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, - :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout) + client_opts = { + :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint, + :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout + } + @client = TreasureData::Client.new(@apikey, client_opts) if @key if @auto_create_table database, table = @key.split('.',2) ensure_database_and_table(database, table) @@ -216,12 +231,12 @@ rescue # TODO (a) Remove the transaction mechanism of fluentd # or (b) keep transaction boundaries in in/out_forward. # This code disables the transaction mechanism (a). - $log.error "#{$!}: #{summarize_record(record)}" - $log.error_backtrace $!.backtrace + log.error "#{$!}: #{summarize_record(record)}" + log.error_backtrace $!.backtrace next end begin record.to_msgpack(out) @@ -232,11 +247,11 @@ noff = out.bytesize sz = noff - off if sz > @record_size_limit # TODO don't raise error #raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record - $log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}" + log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}" end off = noff } out end @@ -277,11 +292,11 @@ io.string end def upload(database, table, io, size, unique_id) unique_str = unique_id.unpack('C*').map {|x| "%02x" % x }.join - $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } + log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" } begin begin start = Time.now @client.import(database, table, "msgpack.gz", io, size, unique_str) @@ -293,29 +308,29 @@ io.pos = 0 retry end rescue => e elapsed = Time.now - start - ne = RuntimeError.new("Failed to upload to TreasureData: #{$!} (#{size} bytes; #{elapsed} seconds)") + ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{$!} (#{size} bytes; #{elapsed} seconds)") ne.set_backtrace(e.backtrace) raise ne end end def check_table_exists(key) unless @table_list.has_key?(key) database, table = key.split('.',2) - $log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" + log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data" io = StringIO.new(@empty_gz_data) begin @client.import(database, table, "msgpack.gz", io, io.size) @table_list[key] = true rescue TreasureData::NotFoundError raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it." rescue - $log.warn "failed to check table existence on Treasure Data", :error=>$!.to_s - $log.debug_backtrace $! + log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error=>$!.to_s + log.debug_backtrace $! end end end def validate_database_and_table_name(database, table, conf) @@ -330,10 +345,10 @@ raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}" end end def ensure_database_and_table(database, table) - $log.info "Creating table #{database}.#{table} on TreasureData" + log.info "Creating table #{database}.#{table} on TreasureData" begin @client.create_log_table(database, table) rescue TreasureData::NotFoundError @client.create_database(database) @client.create_log_table(database, table)