lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.14 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.15
- old
+ new
@@ -69,11 +69,11 @@
require 'cgi' # CGI.escape
require 'time' # Time#rfc2822
require 'td-client'
require 'digest/md5'
super
- @tmpdir = '/tmp/fluent/tdlog'
+ @tmpdir = nil
@apikey = nil
@key = nil
@key_num_limit = 5120 # TODO
@record_size_limit = 32*1024*1024 # TODO
@table_list = []
@@ -89,12 +89,14 @@
# overwrite default value of buffer_chunk_limit
if @buffer.respond_to?(:buffer_chunk_limit=) && !conf['buffer_chunk_limit']
@buffer.buffer_chunk_limit = IMPORT_SIZE_LIMIT
end
- @tmpdir = conf['tmpdir'] || @tmpdir
- FileUtils.mkdir_p(@tmpdir)
+ if conf.has_key?('tmpdir')
+ @tmpdir = conf['tmpdir']
+ FileUtils.mkdir_p(@tmpdir)
+ end
@apikey = conf['apikey']
unless @apikey
raise ConfigError, "'apikey' parameter is required on tdlog output"
end
@@ -162,15 +164,16 @@
@anonymizes[key] = scr
}
@anonymizes = nil if @anonymizes.empty?
@http_proxy = conf['http_proxy']
+ @user_agent = "fluent-plugin-td: 0.10.15" # TODO: automatic increment version
end
def start
super
- @client = TreasureData::Client.new(@apikey, :ssl=>@use_ssl, :http_proxy=>@http_proxy)
+ @client = TreasureData::Client.new(@apikey, :ssl=>@use_ssl, :http_proxy=>@http_proxy, :user_agent=>@user_agent)
unless @auto_create_table
check_table_exists(@key)
end
end
@@ -217,11 +220,15 @@
$log.error "#{$!}: #{summarize_record(record)}"
$log.error_backtrace $!.backtrace
next
end
- record.to_msgpack(out)
+ begin
+ record.to_msgpack(out)
+ rescue RangeError
+ TreasureData::API.normalized_msgpack(record, out)
+ end
noff = out.bytesize
sz = noff - off
if sz > @record_size_limit
# TODO don't raise error
@@ -244,9 +251,10 @@
def write(chunk)
unique_id = chunk.unique_id
database, table = chunk.key.split('.',2)
+ FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil?
f = Tempfile.new("tdlog-", @tmpdir)
w = Zlib::GzipWriter.new(f)
chunk.write_to(w)
w.finish