lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.14 vs lib/fluent/plugin/out_tdlog.rb in fluent-plugin-td-0.10.15

- old
+ new

@@ -69,11 +69,11 @@ require 'cgi' # CGI.escape require 'time' # Time#rfc2822 require 'td-client' require 'digest/md5' super - @tmpdir = '/tmp/fluent/tdlog' + @tmpdir = nil @apikey = nil @key = nil @key_num_limit = 5120 # TODO @record_size_limit = 32*1024*1024 # TODO @table_list = [] @@ -89,12 +89,14 @@ # overwrite default value of buffer_chunk_limit if @buffer.respond_to?(:buffer_chunk_limit=) && !conf['buffer_chunk_limit'] @buffer.buffer_chunk_limit = IMPORT_SIZE_LIMIT end - @tmpdir = conf['tmpdir'] || @tmpdir - FileUtils.mkdir_p(@tmpdir) + if conf.has_key?('tmpdir') + @tmpdir = conf['tmpdir'] + FileUtils.mkdir_p(@tmpdir) + end @apikey = conf['apikey'] unless @apikey raise ConfigError, "'apikey' parameter is required on tdlog output" end @@ -162,15 +164,16 @@ @anonymizes[key] = scr } @anonymizes = nil if @anonymizes.empty? @http_proxy = conf['http_proxy'] + @user_agent = "fluent-plugin-td: 0.10.15" # TODO: automatic increment version end def start super - @client = TreasureData::Client.new(@apikey, :ssl=>@use_ssl, :http_proxy=>@http_proxy) + @client = TreasureData::Client.new(@apikey, :ssl=>@use_ssl, :http_proxy=>@http_proxy, :user_agent=>@user_agent) unless @auto_create_table check_table_exists(@key) end end @@ -217,11 +220,15 @@ $log.error "#{$!}: #{summarize_record(record)}" $log.error_backtrace $!.backtrace next end - record.to_msgpack(out) + begin + record.to_msgpack(out) + rescue RangeError + TreasureData::API.normalized_msgpack(record, out) + end noff = out.bytesize sz = noff - off if sz > @record_size_limit # TODO don't raise error @@ -244,9 +251,10 @@ def write(chunk) unique_id = chunk.unique_id database, table = chunk.key.split('.',2) + FileUtils.mkdir_p(@tmpdir) unless @tmpdir.nil? f = Tempfile.new("tdlog-", @tmpdir) w = Zlib::GzipWriter.new(f) chunk.write_to(w) w.finish