lib/td/logger/td_logger.rb in td-logger-0.3.9 vs lib/td/logger/td_logger.rb in td-logger-0.3.10
- old
+ new
@@ -90,14 +90,15 @@
@cond.signal
}
@upload_thread.join if @upload_thread
@map.each {|(db,table),buffer|
- upload(db, table, buffer)
+ data = buffer.flush!
+ upload(db, table, data)
}
- @queue.each {|tuple|
- upload(*tuple)
+ @queue.each {|db,table,data|
+ upload(ddb, data, table)
}
end
end
def post_with_time(tag, record, time)
@@ -147,10 +148,50 @@
ensure
@mutex.unlock
end
private
+ MAX_KEY_CARDINALITY = 512
+ WARN_KEY_CARDINALITY = 256
+
+ class Buffer
+ def initialize
+ @key_set = {}
+ @data = StringIO.new
+ @gz = Zlib::GzipWriter.new(@data)
+ end
+
+ def key_set_size
+ @key_set.size
+ end
+
+ def update_key_set(record)
+ record.each_key {|key|
+ @key_set[key] = true
+ }
+ @key_set.size
+ end
+
+ def append(data)
+ @gz << data
+ map = MessagePack.unpack(data)
+ end
+
+ def size
+ @data.size
+ end
+
+ def flush!
+ close
+ @data.string
+ end
+
+ def close
+ @gz.close unless @gz.closed?
+ end
+ end
+
def to_msgpack(msg)
begin
msg.to_msgpack
rescue NoMethodError
JSON.load(JSON.dump(msg)).to_msgpack
@@ -184,16 +225,34 @@
if @queue.length > @queue_limit
@logger.error("TreasureDataLogger: queue length exceeds limit. can't add new event log: #{msg.inspect}")
return false
end
- buffer = (@map[key] ||= '')
+ buffer = (@map[key] ||= Buffer.new)
- buffer << data
+ record = MessagePack.unpack(data)
+ unless record.is_a?(Hash)
+ @logger.error("TreasureDataLogger: record must be a Hash: #{msg.inspect}")
+ return false
+ end
+ before = buffer.key_set_size
+ after = buffer.update_key_set(record)
+ if after > MAX_KEY_CARDINALITY
+ @logger.error("TreasureDataLogger: kind of keys in a buffer exceeds #{MAX_KEY_CARDINALITY}.")
+ @map.delete(key)
+ return false
+ end
+ if before <= WARN_KEY_CARDINALITY && after > WARN_KEY_CARDINALITY
+ @logger.warn("TreasureDataLogger: kind of keys in a buffer exceeds #{WARN_KEY_CARDINALITY} which is too large. please check the schema design.")
+ end
+
+ buffer.append(data)
+
if buffer.size > @chunk_limit
- @queue << [db, table, buffer]
+ data = buffer.flush!
+ @queue << [db, table, data]
@map.delete(key)
@cond.signal
end
# stat upload thread if it's not run
@@ -207,22 +266,23 @@
def try_flush
@mutex.synchronize do
if @queue.empty?
@map.reject! {|(db,table),buffer|
- @queue << [db, table, buffer]
+ data = buffer.flush!
+ @queue << [db, table, data]
}
end
end
flushed = false
until @queue.empty?
- tuple = @queue.first
+ db, table, data = @queue.first
begin
- upload(*tuple)
+ upload(db, table, data)
@queue.shift
@error_count = 0
flushed = true
rescue
if @error_count < @retry_limit
@@ -241,14 +301,12 @@
end
flushed
end
- def upload(db, table, buffer)
+ def upload(db, table, data)
begin
- out = StringIO.new
- Zlib::GzipWriter.wrap(out) {|gz| gz.write buffer }
- stream = StringIO.new(out.string)
+ stream = StringIO.new(data)
@logger.debug "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)"
@client.import(db, table, "msgpack.gz", stream, stream.size)
rescue TreasureData::NotFoundError