lib/td/logger/td_logger.rb in td-logger-0.3.11 vs lib/td/logger/td_logger.rb in td-logger-0.3.12

- old
+ new

@@ -7,11 +7,11 @@ require 'delegate' def new(*args, &block) obj = allocate obj.instance_eval { initialize(*args, &block) } dc = DelegateClass(obj.class).new(obj) - ObjectSpace.define_finalizer(dc, finalizer(obj)) + ObjectSpace.define_finalizer(obj, finalizer(obj)) dc end def finalizer(obj) fin = obj.method(:finalize) @@ -64,11 +64,11 @@ @queue = [] @chunk_limit = 8*1024*1024 @queue_limit = 50 - @flush_interval = 10 + @flush_interval = 2 @max_flush_interval = 300 @retry_wait = 1.0 @retry_limit = 12 @finish = false @@ -89,17 +89,29 @@ @flush_now = true @cond.signal } @upload_thread.join if @upload_thread - @map.each {|(db,table),buffer| + @queue.reject! {|db,table,data| + begin + upload(db, table, data) + true + rescue + @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" + false + end + } + @map.reject! {|(db,table),buffer| data = buffer.flush! - upload(db, table, data) + begin + upload(db, table, data) + true + rescue + @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" + false + end } - @queue.each {|db,table,data| - upload(ddb, data, table) - } end end def post_with_time(tag, record, time) @logger.debug { "event: #{tag} #{record.to_json}" rescue nil } @@ -116,22 +128,17 @@ @mutex.lock until @finish now = Time.now.to_i if @next_time <= now || (@flush_now && @error_count == 0) - @mutex.unlock - begin - flushed = try_flush - ensure - @mutex.lock - end + flushed = try_flush @flush_now = false end if @error_count == 0 if flushed && @flush_interval < @max_flush_interval - @flush_interval = [@flush_interval + 60, @max_flush_interval].min + @flush_interval = [@flush_interval ** 2, @max_flush_interval].min end next_wait = @flush_interval else next_wait = @retry_wait * (2 ** (@error_count-1)) end @@ -171,11 +178,10 @@ @key_set.size end def append(data) @gz << data - map = MessagePack.unpack(data) end def size @data.size end @@ -247,10 +253,11 @@ end buffer.append(data) if buffer.size > @chunk_limit + # flush this buffer data = buffer.flush! @queue << [db, table, data] @map.delete(key) @cond.signal end @@ -262,46 +269,58 @@ end true end + # assume @mutex is locked def try_flush - @mutex.synchronize do - if @queue.empty? - @map.reject! {|(db,table),buffer| - data = buffer.flush! - @queue << [db, table, data] - } - end + # force flush small buffers if queue is empty + if @queue.empty? + @map.reject! {|(db,table),buffer| + data = buffer.flush! + @queue << [db, table, data] + } end + if @queue.empty? + return false + end + flushed = false - until @queue.empty? - db, table, data = @queue.first + @mutex.unlock + begin + until @queue.empty? + db, table, data = @queue.first - begin - upload(db, table, data) - @queue.shift - @error_count = 0 - flushed = true - rescue - if @error_count < @retry_limit - @logger.error "Failed to upload event logs to Treasure Data, retrying: #{$!}" - @error_count += 1 - else - @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" - $!.backtrace.each {|bt| - @logger.info bt - } + begin + upload(db, table, data) + @queue.shift @error_count = 0 - @queue.clear + flushed = true + + rescue + if @error_count < @retry_limit + @logger.error "Failed to upload event logs to Treasure Data, retrying: #{$!}" + @error_count += 1 + else + @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" + $!.backtrace.each {|bt| + @logger.info bt + } + @error_count = 0 + @queue.clear + end + return nil + end - return end + + ensure + @mutex.lock end - flushed + return flushed end def upload(db, table, data) begin stream = StringIO.new(data)