lib/td/logger/td_logger.rb in td-logger-0.3.11 vs lib/td/logger/td_logger.rb in td-logger-0.3.12
- old
+ new
@@ -7,11 +7,11 @@
require 'delegate'
def new(*args, &block)
obj = allocate
obj.instance_eval { initialize(*args, &block) }
dc = DelegateClass(obj.class).new(obj)
- ObjectSpace.define_finalizer(dc, finalizer(obj))
+ ObjectSpace.define_finalizer(obj, finalizer(obj))
dc
end
def finalizer(obj)
fin = obj.method(:finalize)
@@ -64,11 +64,11 @@
@queue = []
@chunk_limit = 8*1024*1024
@queue_limit = 50
- @flush_interval = 10
+ @flush_interval = 2
@max_flush_interval = 300
@retry_wait = 1.0
@retry_limit = 12
@finish = false
@@ -89,17 +89,29 @@
@flush_now = true
@cond.signal
}
@upload_thread.join if @upload_thread
- @map.each {|(db,table),buffer|
+ @queue.reject! {|db,table,data|
+ begin
+ upload(db, table, data)
+ true
+ rescue
+ @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}"
+ false
+ end
+ }
+ @map.reject! {|(db,table),buffer|
data = buffer.flush!
- upload(db, table, data)
+ begin
+ upload(db, table, data)
+ true
+ rescue
+ @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}"
+ false
+ end
}
- @queue.each {|db,table,data|
- upload(ddb, data, table)
- }
end
end
def post_with_time(tag, record, time)
@logger.debug { "event: #{tag} #{record.to_json}" rescue nil }
@@ -116,22 +128,17 @@
@mutex.lock
until @finish
now = Time.now.to_i
if @next_time <= now || (@flush_now && @error_count == 0)
- @mutex.unlock
- begin
- flushed = try_flush
- ensure
- @mutex.lock
- end
+ flushed = try_flush
@flush_now = false
end
if @error_count == 0
if flushed && @flush_interval < @max_flush_interval
- @flush_interval = [@flush_interval + 60, @max_flush_interval].min
+ @flush_interval = [@flush_interval ** 2, @max_flush_interval].min
end
next_wait = @flush_interval
else
next_wait = @retry_wait * (2 ** (@error_count-1))
end
@@ -171,11 +178,10 @@
@key_set.size
end
def append(data)
@gz << data
- map = MessagePack.unpack(data)
end
def size
@data.size
end
@@ -247,10 +253,11 @@
end
buffer.append(data)
if buffer.size > @chunk_limit
+ # flush this buffer
data = buffer.flush!
@queue << [db, table, data]
@map.delete(key)
@cond.signal
end
@@ -262,46 +269,58 @@
end
true
end
+ # assume @mutex is locked
def try_flush
- @mutex.synchronize do
- if @queue.empty?
- @map.reject! {|(db,table),buffer|
- data = buffer.flush!
- @queue << [db, table, data]
- }
- end
+ # force flush small buffers if queue is empty
+ if @queue.empty?
+ @map.reject! {|(db,table),buffer|
+ data = buffer.flush!
+ @queue << [db, table, data]
+ }
end
+ if @queue.empty?
+ return false
+ end
+
flushed = false
- until @queue.empty?
- db, table, data = @queue.first
+ @mutex.unlock
+ begin
+ until @queue.empty?
+ db, table, data = @queue.first
- begin
- upload(db, table, data)
- @queue.shift
- @error_count = 0
- flushed = true
- rescue
- if @error_count < @retry_limit
- @logger.error "Failed to upload event logs to Treasure Data, retrying: #{$!}"
- @error_count += 1
- else
- @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}"
- $!.backtrace.each {|bt|
- @logger.info bt
- }
+ begin
+ upload(db, table, data)
+ @queue.shift
@error_count = 0
- @queue.clear
+ flushed = true
+
+ rescue
+ if @error_count < @retry_limit
+ @logger.error "Failed to upload event logs to Treasure Data, retrying: #{$!}"
+ @error_count += 1
+ else
+ @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}"
+ $!.backtrace.each {|bt|
+ @logger.info bt
+ }
+ @error_count = 0
+ @queue.clear
+ end
+ return nil
+
end
- return
end
+
+ ensure
+ @mutex.lock
end
- flushed
+ return flushed
end
def upload(db, table, data)
begin
stream = StringIO.new(data)