lib/td/logger/td_logger.rb in td-logger-0.3.24 vs lib/td/logger/td_logger.rb in td-logger-0.3.25

- old
+ new

@@ -44,15 +44,15 @@ @mutex = Mutex.new @cond = ConditionVariable.new @map = {} # (db,table) => buffer:String @queue = [] - @chunk_limit = 8 * 1024 * 1024 + @chunk_limit = options[:chunk_limit] || 8 * 1024 * 1024 @queue_limit = 50 - @flush_interval = 2 - @max_flush_interval = 300 + @flush_interval = options[:flush_interval] || 2 + @max_flush_interval = options[:max_flush_interval] || 300 @retry_wait = 1.0 @retry_limit = 12 @finish = false @next_time = Time.now.to_i + @flush_interval @@ -60,10 +60,12 @@ # start thread when the first post() is called for # Unicorn and Passenger. @upload_thread = nil + @use_unique_key = options[:use_unique_key] + # The calling order of finalizer registered by define_finalizer is indeterminate, # so we should use at_exit instead for memory safety. at_exit { close } end @@ -209,10 +211,12 @@ JSON.load(JSON.dump(msg)).to_msgpack end end def add(db, table, msg) + # NOTE: TreasureData::API is defined at td-client-ruby gem + # https://github.com/treasure-data/td-client-ruby/blob/master/lib/td/client/api.rb begin TreasureData::API.validate_database_name(db) rescue @logger.error "TreasureDataLogger: Invalid database name #{db.inspect}: #{$!}" raise "Invalid database name #{db.inspect}: #{$!}" @@ -327,16 +331,18 @@ return flushed end def upload(db, table, data) + unique_key = @use_unique_key ? generate_unique_key : nil + begin stream = StringIO.new(data) - @logger.debug "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)" + @logger.info "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)" - @client.import(db, table, "msgpack.gz", stream, stream.size) + @client.import(db, table, "msgpack.gz", stream, stream.size, unique_key) rescue TreasureData::NotFoundError unless @auto_create_table raise $! end @logger.info "Creating table #{db}.#{table} on Treasure Data" @@ -346,9 +352,18 @@ @client.create_database(db) @client.create_log_table(db, table) end retry end + end + + # NOTE fluentd unique_id and fluent-plugin-td unique_str in reference. + # https://github.com/fluent/fluentd/blob/v0.12.15/lib/fluent/plugin/buf_memory.rb#L22 + # https://github.com/treasure-data/fluent-plugin-td/blob/v0.10.27/lib/fluent/plugin/out_tdlog.rb#L225 + def generate_unique_key(now = Time.now) + u1 = ((now.to_i*1000*1000+now.usec) << 12 | rand(0xfff)) + uid = [u1 >> 32, u1 & 0xffffffff, rand(0xffffffff), rand(0xffffffff)].pack('NNNN') + uid.unpack('C*').map { |x| "%02x" % x }.join end require 'thread' # ConditionVariable if ConditionVariable.new.method(:wait).arity == 1 # "WARNING: Running on Ruby 1.8. Ruby 1.9 is recommended."