lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.8.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.0.0.rc1
- old
+ new
@@ -1,31 +1,28 @@
-require 'fluent/output'
+require 'mongo'
+require 'msgpack'
+require 'fluent/plugin/output'
+require 'fluent/plugin/mongo_auth'
+require 'fluent/plugin/logger_support'
-module Fluent
- class MongoOutput < BufferedOutput
- Plugin.register_output('mongo', self)
+module Fluent::Plugin
+ class MongoOutput < Output
+ Fluent::Plugin.register_output('mongo', self)
- unless method_defined?(:log)
- define_method(:log) { $log }
- end
+ helpers :event_emitter, :inject, :compat_parameters
- require 'fluent/plugin/mongo_auth'
- include MongoAuthParams
- include MongoAuth
- require 'fluent/plugin/logger_support'
- include LoggerSupport
+ include Fluent::MongoAuthParams
+ include Fluent::MongoAuth
+ include Fluent::LoggerSupport
- include SetTagKeyMixin
- config_set_default :include_tag_key, false
+ DEFAULT_BUFFER_TYPE = "memory"
- include SetTimeKeyMixin
+ config_set_default :include_tag_key, false
config_set_default :include_time_key, true
- desc "MongoDB connection string"
- config_param :connection_string, :default => nil
desc "MongoDB database"
- config_param :database, :string, :default => nil
+ config_param :database, :string
desc "MongoDB collection"
config_param :collection, :string, default: 'untagged'
desc "MongoDB host"
config_param :host, :string, default: 'localhost'
desc "MongoDB port"
@@ -51,68 +48,67 @@
config_param :ssl_key, :string, default: nil
config_param :ssl_key_pass_phrase, :string, default: nil, secret: true
config_param :ssl_verify, :bool, default: false
config_param :ssl_ca_cert, :string, default: nil
+ config_section :buffer do
+ config_set_default :@type, DEFAULT_BUFFER_TYPE
+ config_set_default :chunk_keys, ['tag']
+ end
+
attr_reader :client_options, :collection_options
def initialize
super
- require 'mongo'
- require 'msgpack'
-
- @nodes = nil
@client_options = {}
@collection_options = {capped: false}
end
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2
LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2
def configure(conf)
if conf.has_key?('buffer_chunk_limit')
- configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit'])
+ configured_chunk_limit_size = Fluent::Config.size_value(conf['buffer_chunk_limit'])
estimated_limit_size = LIMIT_AFTER_v1_8
estimated_limit_size_conf = '8m'
- if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit'])
+ if conf.has_key?('mongodb_smaller_bson_limit') && Fluent::Config.bool_value(conf['mongodb_smaller_bson_limit'])
estimated_limit_size = LIMIT_BEFORE_v1_8
estimated_limit_size_conf = '2m'
end
if configured_chunk_limit_size > estimated_limit_size
log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}"
conf['buffer_chunk_limit'] = estimated_limit_size_conf
end
else
- if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit'])
+ if conf.has_key?('mongodb_smaller_bson_limit') && Fluent::Config.bool_value(conf['mongodb_smaller_bson_limit'])
conf['buffer_chunk_limit'] = '2m'
else
conf['buffer_chunk_limit'] = '8m'
end
end
+ compat_parameters_convert(conf, :inject)
super
- if @connection_string.nil? && @database.nil?
- raise Fluent::ConfigError, "connection_string or database parameter is required"
- end
-
unless @ignore_invalid_record
log.warn "Since v0.8, invalid record detection will be removed because mongo driver v2.x and API spec don't provide it. You may lose invalid records, so you should not send such records to mongo plugin"
end
if conf.has_key?('tag_mapped')
- @tag_mapped = true
+ log.warn "'tag_mapped' feature is replaced with built-in config placeholder. Please consider to use 'collection ${tag}'."
+ @collection = '${tag}'
end
- raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection')
+ raise Fluent::ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection')
if conf.has_key?('capped')
- raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
+ raise Fluent::ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
@collection_options[:capped] = true
- @collection_options[:size] = Config.size_value(conf['capped_size'])
- @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
+ @collection_options[:size] = Fluent::Config.size_value(conf['capped_size'])
+ @collection_options[:max] = Fluent::Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
end
if remove_tag_prefix = conf['remove_tag_prefix']
@remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
end
@@ -126,17 +122,11 @@
@client_options[:ssl_key] = @ssl_key
@client_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase
@client_options[:ssl_verify] = @ssl_verify
@client_options[:ssl_ca_cert] = @ssl_ca_cert
end
- @nodes = ["#{@host}:#{@port}"] if @nodes.nil?
- # MongoDB uses BSON's Date for time.
- def @timef.format_nocache(time)
- time
- end
-
configure_logger(@mongo_log_level)
log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}"
end
@@ -161,31 +151,39 @@
def format(tag, time, record)
[time, record].to_msgpack
end
+ def formatted_to_msgpack_binary
+ true
+ end
+
+ def multi_workers_ready?
+ true
+ end
+
def write(chunk)
- collection_name = @tag_mapped ? chunk.key : @collection
+ collection_name = extract_placeholders(@collection, chunk.metadata)
operate(format_collection_name(collection_name), collect_records(chunk))
end
private
def client
- if @connection_string
- Mongo::Client.new(@connection_string)
- else
- @client_options[:database] = @database
- @client_options[:user] = @user if @user
- @client_options[:password] = @password if @password
- Mongo::Client.new(@nodes, @client_options)
- end
+ @client_options[:database] = @database
+ @client_options[:user] = @user if @user
+ @client_options[:password] = @password if @password
+ Mongo::Client.new(["#{@host}:#{@port}"], @client_options)
end
def collect_records(chunk)
records = []
+ time_key = @inject_config.time_key
+ tag = chunk.metadata.tag
chunk.msgpack_each {|time, record|
- record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key
+ record = inject_values_to_record(tag, time, record)
+ # MongoDB uses BSON's Date for time.
+ record[time_key] = Time.at(time || record[time_key]) if time_key
records << record
}
records
end