lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.7.12 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.7.13

- old
+ new

@@ -1,5 +1,7 @@ +require 'fluent/output' + module Fluent class MongoOutput < BufferedOutput Plugin.register_output('mongo', self) require 'fluent/plugin/mongo_util' @@ -34,10 +36,13 @@ config_param :ssl_key, :string, :default => nil config_param :ssl_key_pass_phrase, :string, :default => nil, :secret => true config_param :ssl_verify, :bool, :default => false config_param :ssl_ca_cert, :string, :default => nil + # For older (1.7 or earlier) MongoDB versions + config_param :mongodb_smaller_bson_limit, :bool, :default => false + attr_reader :collection_options, :connection_options unless method_defined?(:log) define_method(:log) { $log } end @@ -50,11 +55,35 @@ @clients = {} @connection_options = {} @collection_options = {:capped => false} end + # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. + LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 + LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 + def configure(conf) + if conf.has_key?('buffer_chunk_limit') + configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit']) + estimated_limit_size = LIMIT_AFTER_v1_8 + estimated_limit_size_conf = '8m' + if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) + estimated_limit_size = LIMIT_BEFORE_v1_8 + estimated_limit_size_conf = '2m' + end + if configured_chunk_limit_size > estimated_limit_size + log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}" + conf['buffer_chunk_limit'] = estimated_limit_size_conf + end + else + if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) + conf['buffer_chunk_limit'] = '2m' + else + conf['buffer_chunk_limit'] = '8m' + end + end + super if conf.has_key?('tag_mapped') @tag_mapped = true @disable_collection_check = true if @disable_collection_check.nil? @@ -100,13 +129,10 @@ def start # Non tag mapped mode, we can check collection configuration before server start. get_or_create_collection(@collection) unless @tag_mapped - # From configure for avoding complex method dependency... - @buffer.buffer_chunk_limit = available_buffer_chunk_limit - super end def shutdown # Mongo::Connection checks alive or closed myself @@ -228,48 +254,9 @@ end def get_connection db = Mongo::MongoClient.new(@host, @port, @connection_options).db(@database) authenticate(db) - end - - # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. - LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 - LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 - - def available_buffer_chunk_limit - begin - limit = mongod_version >= "1.8.0" ? LIMIT_AFTER_v1_8 : LIMIT_BEFORE_v1_8 - rescue Mongo::ConnectionFailure => e - log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}" - exit! - rescue Mongo::OperationFailure => e - log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}" - exit! - rescue Exception => e - log.warn "mongo unknown error #{e}, set #{LIMIT_BEFORE_v1_8} to chunk limit" - limit = LIMIT_BEFORE_v1_8 - end - - if @buffer.buffer_chunk_limit > limit - log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{limit}" - limit - else - @buffer.buffer_chunk_limit - end - end - - def mongod_version - version = nil - - begin - version = get_connection.command('buildInfo' => 1)['version'] - rescue Mongo::OperationFailure - # fallback for buggy mongod version support - version = authenticate(Mongo::MongoClient.new(@host, @port, @connection_options).db('admin')).command('buildInfo' => 1)['version'] - end - - version end def replace_key_of_hash(hash_or_array, pattern, replacement) case hash_or_array when Array