lib/td/logger/td_logger.rb in td-logger-0.3.24 vs lib/td/logger/td_logger.rb in td-logger-0.3.25
- old
+ new
@@ -44,15 +44,15 @@
@mutex = Mutex.new
@cond = ConditionVariable.new
@map = {} # (db,table) => buffer:String
@queue = []
- @chunk_limit = 8 * 1024 * 1024
+ @chunk_limit = options[:chunk_limit] || 8 * 1024 * 1024
@queue_limit = 50
- @flush_interval = 2
- @max_flush_interval = 300
+ @flush_interval = options[:flush_interval] || 2
+ @max_flush_interval = options[:max_flush_interval] || 300
@retry_wait = 1.0
@retry_limit = 12
@finish = false
@next_time = Time.now.to_i + @flush_interval
@@ -60,10 +60,12 @@
# start thread when the first post() is called for
# Unicorn and Passenger.
@upload_thread = nil
+ @use_unique_key = options[:use_unique_key]
+
# The calling order of finalizer registered by define_finalizer is indeterminate,
# so we should use at_exit instead for memory safety.
at_exit { close }
end
@@ -209,10 +211,12 @@
JSON.load(JSON.dump(msg)).to_msgpack
end
end
def add(db, table, msg)
+ # NOTE: TreasureData::API is defined at td-client-ruby gem
+ # https://github.com/treasure-data/td-client-ruby/blob/master/lib/td/client/api.rb
begin
TreasureData::API.validate_database_name(db)
rescue
@logger.error "TreasureDataLogger: Invalid database name #{db.inspect}: #{$!}"
raise "Invalid database name #{db.inspect}: #{$!}"
@@ -327,16 +331,18 @@
return flushed
end
def upload(db, table, data)
+ unique_key = @use_unique_key ? generate_unique_key : nil
+
begin
stream = StringIO.new(data)
- @logger.debug "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)"
+ @logger.info "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)"
- @client.import(db, table, "msgpack.gz", stream, stream.size)
+ @client.import(db, table, "msgpack.gz", stream, stream.size, unique_key)
rescue TreasureData::NotFoundError
unless @auto_create_table
raise $!
end
@logger.info "Creating table #{db}.#{table} on Treasure Data"
@@ -346,9 +352,18 @@
@client.create_database(db)
@client.create_log_table(db, table)
end
retry
end
+ end
+
+ # NOTE fluentd unique_id and fluent-plugin-td unique_str in reference.
+ # https://github.com/fluent/fluentd/blob/v0.12.15/lib/fluent/plugin/buf_memory.rb#L22
+ # https://github.com/treasure-data/fluent-plugin-td/blob/v0.10.27/lib/fluent/plugin/out_tdlog.rb#L225
+ def generate_unique_key(now = Time.now)
+ u1 = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff))
+ uid = [u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN')
+ uid.unpack('C*').map { |x| "%02x" % x }.join
end
require 'thread' # ConditionVariable
if ConditionVariable.new.method(:wait).arity == 1
# "WARNING: Running on Ruby 1.8. Ruby 1.9 is recommended."