lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.7.12 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.7.13
- old
+ new
@@ -1,5 +1,7 @@
+require 'fluent/output'
+
module Fluent
class MongoOutput < BufferedOutput
Plugin.register_output('mongo', self)
require 'fluent/plugin/mongo_util'
@@ -34,10 +36,13 @@
config_param :ssl_key, :string, :default => nil
config_param :ssl_key_pass_phrase, :string, :default => nil, :secret => true
config_param :ssl_verify, :bool, :default => false
config_param :ssl_ca_cert, :string, :default => nil
+ # For older (1.7 or earlier) MongoDB versions
+ config_param :mongodb_smaller_bson_limit, :bool, :default => false
+
attr_reader :collection_options, :connection_options
unless method_defined?(:log)
define_method(:log) { $log }
end
@@ -50,11 +55,35 @@
@clients = {}
@connection_options = {}
@collection_options = {:capped => false}
end
+ # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
+ LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2
+ LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2
+
def configure(conf)
+ if conf.has_key?('buffer_chunk_limit')
+ configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit'])
+ estimated_limit_size = LIMIT_AFTER_v1_8
+ estimated_limit_size_conf = '8m'
+ if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit'])
+ estimated_limit_size = LIMIT_BEFORE_v1_8
+ estimated_limit_size_conf = '2m'
+ end
+ if configured_chunk_limit_size > estimated_limit_size
+ log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}"
+ conf['buffer_chunk_limit'] = estimated_limit_size_conf
+ end
+ else
+ if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit'])
+ conf['buffer_chunk_limit'] = '2m'
+ else
+ conf['buffer_chunk_limit'] = '8m'
+ end
+ end
+
super
if conf.has_key?('tag_mapped')
@tag_mapped = true
@disable_collection_check = true if @disable_collection_check.nil?
@@ -100,13 +129,10 @@
def start
# Non tag mapped mode, we can check collection configuration before server start.
get_or_create_collection(@collection) unless @tag_mapped
- # From configure for avoding complex method dependency...
- @buffer.buffer_chunk_limit = available_buffer_chunk_limit
-
super
end
def shutdown
# Mongo::Connection checks alive or closed myself
@@ -228,48 +254,9 @@
end
def get_connection
db = Mongo::MongoClient.new(@host, @port, @connection_options).db(@database)
authenticate(db)
- end
-
- # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
- LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2
- LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2
-
- def available_buffer_chunk_limit
- begin
- limit = mongod_version >= "1.8.0" ? LIMIT_AFTER_v1_8 : LIMIT_BEFORE_v1_8
- rescue Mongo::ConnectionFailure => e
- log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}"
- exit!
- rescue Mongo::OperationFailure => e
- log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}"
- exit!
- rescue Exception => e
- log.warn "mongo unknown error #{e}, set #{LIMIT_BEFORE_v1_8} to chunk limit"
- limit = LIMIT_BEFORE_v1_8
- end
-
- if @buffer.buffer_chunk_limit > limit
- log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{limit}"
- limit
- else
- @buffer.buffer_chunk_limit
- end
- end
-
- def mongod_version
- version = nil
-
- begin
- version = get_connection.command('buildInfo' => 1)['version']
- rescue Mongo::OperationFailure
- # fallback for buggy mongod version support
- version = authenticate(Mongo::MongoClient.new(@host, @port, @connection_options).db('admin')).command('buildInfo' => 1)['version']
- end
-
- version
end
def replace_key_of_hash(hash_or_array, pattern, replacement)
case hash_or_array
when Array