module TreasureData module Logger class TreasureDataLogger < Fluent::Logger::LoggerBase module Finalizable require 'delegate' def new(*args, &block) obj = allocate obj.instance_eval { initialize(*args, &block) } dc = DelegateClass(obj.class).new(obj) ObjectSpace.define_finalizer(obj, finalizer(obj)) dc end def finalizer(obj) fin = obj.method(:finalize) proc {|id| fin.call } end end extend Finalizable def initialize(tag_prefix, options={}) defaults = { :auto_create_table => false, } options = defaults.merge!(options) @tag_prefix = tag_prefix @auto_create_table = !!options[:auto_create_table] apikey = options[:apikey] unless apikey raise ArgumentError, ":apikey options is required" end debug = !!options[:debug] require 'thread' require 'stringio' require 'zlib' require 'msgpack' require 'json' require 'time' require 'net/http' require 'cgi' require 'logger' require 'td-client' @logger = ::Logger.new(STDERR) if debug @logger.level = ::Logger::DEBUG else @logger.level = ::Logger::INFO end @client = TreasureData::Client.new(apikey) @mutex = Mutex.new @cond = ConditionVariable.new @map = {} # (db,table) => buffer:String @queue = [] @chunk_limit = 8*1024*1024 @queue_limit = 50 @flush_interval = 2 @max_flush_interval = 300 @retry_wait = 1.0 @retry_limit = 12 @finish = false @next_time = Time.now.to_i + @flush_interval @error_count = 0 # start thread when the first post() is called for # Unicorn and Passenger. @upload_thread = nil end attr_accessor :logger def close unless @finish @finish = true @mutex.synchronize { @flush_now = true @cond.signal } @upload_thread.join if @upload_thread @queue.reject! {|db,table,data| begin upload(db, table, data) true rescue @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" false end } @map.reject! {|(db,table),buffer| data = buffer.flush! begin upload(db, table, data) true rescue @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" false end } end end def post_with_time(tag, record, time) @logger.debug { "event: #{tag} #{record.to_json}" rescue nil } record[:time] ||= time.to_i tag = "#{@tag_prefix}.#{tag}" if @tag_prefix db, table = tag.split('.')[-2, 2] add(db, table, record) end def upload_main @mutex.lock until @finish now = Time.now.to_i if @next_time <= now || (@flush_now && @error_count == 0) flushed = try_flush @flush_now = false end if @error_count == 0 if flushed && @flush_interval < @max_flush_interval @flush_interval = [@flush_interval ** 2, @max_flush_interval].min end next_wait = @flush_interval else next_wait = @retry_wait * (2 ** (@error_count-1)) end @next_time = next_wait + now cond_wait(next_wait) end rescue @logger.error "Unexpected error: #{$!}" $!.backtrace.each {|bt| @logger.info bt } ensure @mutex.unlock end private MAX_KEY_CARDINALITY = 512 WARN_KEY_CARDINALITY = 256 class Buffer def initialize @key_set = {} @data = StringIO.new @gz = Zlib::GzipWriter.new(@data) end def key_set_size @key_set.size end def update_key_set(record) record.each_key {|key| @key_set[key] = true } @key_set.size end def append(data) @gz << data end def size @data.size end def flush! close @data.string end def close @gz.close unless @gz.closed? end end def to_msgpack(msg) begin msg.to_msgpack rescue NoMethodError JSON.load(JSON.dump(msg)).to_msgpack end end def add(db, table, msg) begin TreasureData::API.validate_database_name(db) rescue @logger.error "TreasureDataLogger: Invalid database name #{db.inspect}: #{$!}" raise "Invalid database name #{db.inspect}: #{$!}" end begin TreasureData::API.validate_table_name(table) rescue @logger.error "TreasureDataLogger: Invalid table name #{table.inspect}: #{$!}" raise "Invalid table name #{table.inspect}: #{$!}" end begin data = to_msgpack(msg) rescue @logger.error("TreasureDataLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}") return false end key = [db, table] @mutex.synchronize do if @queue.length > @queue_limit @logger.error("TreasureDataLogger: queue length exceeds limit. can't add new event log: #{msg.inspect}") return false end buffer = (@map[key] ||= Buffer.new) record = MessagePack.unpack(data) unless record.is_a?(Hash) @logger.error("TreasureDataLogger: record must be a Hash: #{msg.inspect}") return false end before = buffer.key_set_size after = buffer.update_key_set(record) if after > MAX_KEY_CARDINALITY @logger.error("TreasureDataLogger: kind of keys in a buffer exceeds #{MAX_KEY_CARDINALITY}.") @map.delete(key) return false end if before <= WARN_KEY_CARDINALITY && after > WARN_KEY_CARDINALITY @logger.warn("TreasureDataLogger: kind of keys in a buffer exceeds #{WARN_KEY_CARDINALITY} which is too large. please check the schema design.") end buffer.append(data) if buffer.size > @chunk_limit # flush this buffer data = buffer.flush! @queue << [db, table, data] @map.delete(key) @cond.signal end # stat upload thread if it's not run unless @upload_thread @upload_thread = Thread.new(&method(:upload_main)) end end true end # assume @mutex is locked def try_flush # force flush small buffers if queue is empty if @queue.empty? @map.reject! {|(db,table),buffer| data = buffer.flush! @queue << [db, table, data] } end if @queue.empty? return false end flushed = false @mutex.unlock begin until @queue.empty? db, table, data = @queue.first begin upload(db, table, data) @queue.shift @error_count = 0 flushed = true rescue if @error_count < @retry_limit @logger.error "Failed to upload event logs to Treasure Data, retrying: #{$!}" @error_count += 1 else @logger.error "Failed to upload event logs to Treasure Data, trashed: #{$!}" $!.backtrace.each {|bt| @logger.info bt } @error_count = 0 @queue.clear end return nil end end ensure @mutex.lock end return flushed end def upload(db, table, data) begin stream = StringIO.new(data) @logger.debug "Uploading event logs to #{db}.#{table} table on Treasure Data (#{stream.size} bytes)" @client.import(db, table, "msgpack.gz", stream, stream.size) rescue TreasureData::NotFoundError unless @auto_create_table raise $! end @logger.info "Creating table #{db}.#{table} on Treasure Data" begin @client.create_log_table(db, table) rescue TreasureData::NotFoundError @client.create_database(db) @client.create_log_table(db, table) end retry end end def finalize close end require 'thread' # ConditionVariable if ConditionVariable.new.method(:wait).arity == 1 # "WARNING: Running on Ruby 1.8. Ruby 1.9 is recommended." require 'timeout' def cond_wait(sec) Timeout.timeout(sec) { @cond.wait(@mutex) } rescue Timeout::Error end else def cond_wait(sec) @cond.wait(@mutex, sec) end end end end end