lib/td/logger/td_logger.rb in td-logger-0.3.9 vs lib/td/logger/td_logger.rb in td-logger-0.3.10

- old
+ new

@@ -90,14 +90,15 @@ @cond.signal } @upload_thread.join if @upload_thread @map.each {|(db,table),buffer| - upload(db, table, buffer) + data = buffer.flush! + upload(db, table, data) } - @queue.each {|tuple| - upload(*tuple) + @queue.each {|db,table,data| + upload(ddb, data, table) } end end def post_with_time(tag, record, time) @@ -147,10 +148,50 @@ ensure @mutex.unlock end private + MAX_KEY_CARDINALITY = 512 + WARN_KEY_CARDINALITY = 256 + + class Buffer + def initialize + @key_set = {} + @data = StringIO.new + @gz = Zlib::GzipWriter.new(@data) + end + + def key_set_size + @key_set.size + end + + def update_key_set(record) + record.each_key {|key| + @key_set[key] = true + } + @key_set.size + end + + def append(data) + @gz << data + map = MessagePack.unpack(data) + end + + def size + @data.size + end + + def flush! + close + @data.string + end + + def close + @gz.close unless @gz.closed? + end + end + def to_msgpack(msg) begin msg.to_msgpack rescue NoMethodError JSON.load(JSON.dump(msg)).to_msgpack @@ -184,16 +225,34 @@ if @queue.length > @queue_limit @logger.error("TreasureDataLogger: queue length exceeds limit. can't add new event log: #{msg.inspect}") return false end - buffer = (@map[key] ||= '') + buffer = (@map[key] ||= Buffer.new) - buffer << data + record = MessagePack.unpack(data) + unless record.is_a?(Hash) + @logger.error("TreasureDataLogger: record must be a Hash: #{msg.inspect}") + return false + end + before = buffer.key_set_size + after = buffer.update_key_set(record) + if after > MAX_KEY_CARDINALITY + @logger.error("TreasureDataLogger: kind of keys in a buffer exceeds #{MAX_KEY_CARDINALITY}.") + @map.delete(key) + return false + end + if before <= WARN_KEY_CARDINALITY && after > WARN_KEY_CARDINALITY + @logger.warn("TreasureDataLogger: kind of keys in a buffer exceeds #{WARN_KEY_CARDINALITY} which is too large. please check the schema design.") + end + + buffer.append(data) + if buffer.size > @chunk_limit - @queue << [db, table, buffer] + data = buffer.flush! + @queue << [db, table, data] @map.delete(key) @cond.signal end # stat upload thread if it's not run @@ -207,22 +266,23 @@ def try_flush @mutex.synchronize do if @queue.empty? @map.reject! {|(db,table),buffer| - @queue << [db, table, buffer] + data = buffer.flush! + @queue << [db, table, data] } end end flushed = false until @queue.empty? - tuple = @queue.first + db, table, data = @queue.first begin - upload(*tuple) + upload(db, table, data) @queue.shift @error_count = 0 flushed = true rescue if @error_count < @retry_limit @@ -241,14 +301,12 @@ end flushed end - def upload(db, table, buffer) + def upload(db, table, data) begin - out = StringIO.new - Zlib::GzipWriter.wrap(out) {|gz| gz.write buffer } - stream = StringIO.new(out.string) + stream = StringIO.new(data) @logger.debug "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)" @client.import(db, table, "msgpack.gz", stream, stream.size) rescue TreasureData::NotFoundError