lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.17 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.18
- old
+ new
@@ -58,10 +58,17 @@
"#{o1}.#{o2}.#{o3}.#{o4}"
end
end
+ # To support log_level option since Fluentd v0.10.43
+ unless method_defined?(:log)
+ define_method(:log) { $log }
+ end
+
+ config_param :endpoint, :string, :default => nil
+
config_param :connect_timeout, :integer, :default => nil
config_param :read_timeout, :integer, :default => nil
config_param :send_timeout, :integer, :default => nil
def initialize
@@ -81,11 +88,11 @@
@key = nil
@key_num_limit = 5120 # TODO
@record_size_limit = 32*1024*1024 # TODO
@table_list = {}
@auto_create_table = true
- @use_ssl = false
+ @use_ssl = true
@buffer_type = 'file' # overwrite default buffer_type
@flush_interval = 300 # overwrite default flush_interval to 5mins
@empty_gz_data = create_empty_gz_data
end
@@ -157,18 +164,26 @@
@anonymizes[key] = scr
}
@anonymizes = nil if @anonymizes.empty?
@http_proxy = conf['http_proxy']
- @user_agent = "fluent-plugin-td: 0.10.17" # TODO: automatic increment version
+ @user_agent = "fluent-plugin-td: 0.10.18" # TODO: automatic increment version
+
+ if @endpoint.nil?
+ $log.warn "tdlog plugin will change the API endpoint from api.treasure-data.com to api.treasuredata.com"
+ $log.warn "If want to keep api.treasure-data.com, please set 'endpoint api.treasure-data.com' in tdlog configuration"
+ end
end
def start
super
- @client = TreasureData::Client.new(@apikey, :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent,
- :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout)
+ client_opts = {
+ :ssl => @use_ssl, :http_proxy => @http_proxy, :user_agent => @user_agent, :endpoint => @endpoint,
+ :connect_timeout => @connect_timeout, :read_timeout => @read_timeout, :send_timeout => @send_timeout
+ }
+ @client = TreasureData::Client.new(@apikey, client_opts)
if @key
if @auto_create_table
database, table = @key.split('.',2)
ensure_database_and_table(database, table)
@@ -216,12 +231,12 @@
rescue
# TODO (a) Remove the transaction mechanism of fluentd
# or (b) keep transaction boundaries in in/out_forward.
# This code disables the transaction mechanism (a).
- $log.error "#{$!}: #{summarize_record(record)}"
- $log.error_backtrace $!.backtrace
+ log.error "#{$!}: #{summarize_record(record)}"
+ log.error_backtrace $!.backtrace
next
end
begin
record.to_msgpack(out)
@@ -232,11 +247,11 @@
noff = out.bytesize
sz = noff - off
if sz > @record_size_limit
# TODO don't raise error
#raise "Size of a record too large (#{sz} bytes)" # TODO include summary of the record
- $log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
+ log.warn "Size of a record too large (#{sz} bytes): #{summarize_record(record)}"
end
off = noff
}
out
end
@@ -277,11 +292,11 @@
io.string
end
def upload(database, table, io, size, unique_id)
unique_str = unique_id.unpack('C*').map {|x| "%02x" % x }.join
- $log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
+ log.trace { "uploading logs to Treasure Data database=#{database} table=#{table} (#{size}bytes)" }
begin
begin
start = Time.now
@client.import(database, table, "msgpack.gz", io, size, unique_str)
@@ -293,29 +308,29 @@
io.pos = 0
retry
end
rescue => e
elapsed = Time.now - start
- ne = RuntimeError.new("Failed to upload to TreasureData: #{$!} (#{size} bytes; #{elapsed} seconds)")
+ ne = RuntimeError.new("Failed to upload to Treasure Data '#{database}.#{table}' table: #{$!} (#{size} bytes; #{elapsed} seconds)")
ne.set_backtrace(e.backtrace)
raise ne
end
end
def check_table_exists(key)
unless @table_list.has_key?(key)
database, table = key.split('.',2)
- $log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
+ log.debug "checking whether table '#{database}.#{table}' exists on Treasure Data"
io = StringIO.new(@empty_gz_data)
begin
@client.import(database, table, "msgpack.gz", io, io.size)
@table_list[key] = true
rescue TreasureData::NotFoundError
raise "Table #{key.inspect} does not exist on Treasure Data. Use 'td table:create #{database} #{table}' to create it."
rescue
- $log.warn "failed to check table existence on Treasure Data", :error=>$!.to_s
- $log.debug_backtrace $!
+ log.warn "failed to check existence of '#{database}.#{table}' table on Treasure Data", :error=>$!.to_s
+ log.debug_backtrace $!
end
end
end
def validate_database_and_table_name(database, table, conf)
@@ -330,10 +345,10 @@
raise ConfigError, "Invalid table name #{table.inspect}: #{e}: #{conf}"
end
end
def ensure_database_and_table(database, table)
- $log.info "Creating table #{database}.#{table} on TreasureData"
+ log.info "Creating table #{database}.#{table} on TreasureData"
begin
@client.create_log_table(database, table)
rescue TreasureData::NotFoundError
@client.create_database(database)
@client.create_log_table(database, table)