lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.4.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.5.0

- old
+ new

@@ -16,10 +16,11 @@ config_param :port, :integer, :default => 27017 attr_reader :argument def initialize + @clients = {} super require 'mongo' require 'msgpack' @argument = {:capped => false} @@ -34,56 +35,96 @@ @argument[:capped] = true @argument[:size] = Config.size_value(conf['capped_size']) @argument[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end + if @buffer.respond_to?(:buffer_chunk_limit) + @buffer.buffer_chunk_limit = available_buffer_chunk_limit + else + $log.warn "#{Fluent::VERSION} does not have :buffer_chunk_limit. Be careful when insert large documents to MongoDB" + end + # MongoDB uses BSON's Date for time. def @timef.format_nocache(time) time end end def start super - @client = get_or_create_collection end def shutdown # Mongo::Connection checks alive or closed myself - @client.db.connection.close + @clients.values.each { |client| client.db.connection.close } super end def format(tag, time, record) record.to_msgpack end def write(chunk) + operate(@collection, collect_records(chunk)) + end + + def format_collection_name(collection_name) + formatted = collection_name + formatted = formatted.gsub(@remove_prefix_collection, '') if @remove_prefix_collection + formatted = formatted.gsub(/(^\.+)|(\.+$)/, '') + formatted = @collection if formatted.size == 0 # set default for nil tag + formatted + end + + private + + def operate(collection_name, records) + get_or_create_collection(collection_name).insert(records) + end + + def collect_records(chunk) records = [] chunk.msgpack_each { |record| record[@time_key] = Time.at(record[@time_key]) if @include_time_key records << record } - operate(records) + records end - private + def get_or_create_collection(collection_name) + collection_name = format_collection_name(collection_name) + return @clients[collection_name] if @clients[collection_name] - def get_or_create_collection - db = Mongo::Connection.new(@host, @port).db(@database) - if db.collection_names.include?(@collection) - collection = db.collection(@collection) - return collection if @argument[:capped] == collection.capped? # TODO: Verify capped configuration - - # raise Exception if old collection does not match lastest configuration - raise ConfigError, "New configuration is different from existing collection" + @db ||= Mongo::Connection.new(@host, @port).db(@database) + if @db.collection_names.include?(collection_name) + collection = @db.collection(collection_name) + unless @argument[:capped] == collection.capped? # TODO: Verify capped configuration + # raise Exception if old collection does not match lastest configuration + raise ConfigError, "New configuration is different from existing collection" + end + else + collection = @db.create_collection(collection_name, @argument) end - db.create_collection(@collection, @argument) + @clients[collection_name] = collection end - def operate(records) - @client.insert(records) + # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. + LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 + LIMIT_AFTER_v1_8 = 10 * 1024 * 1024 # 10MB = 16MB / 2 + alpha + + def available_buffer_chunk_limit + limit = mongod_version >= "1.8.0" ? LIMIT_AFTER_v1_8 : LIMIT_BEFORE_v1_8 # TODO: each version comparison + if @buffer.buffer_chunk_limit > limit + $log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{limit}" + limit + else + @buffer.buffer_chunk_limit + end + end + + def mongod_version + Mongo::Connection.new.db('admin').command('serverStatus' => 1)['version'] end end end