lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.8.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.8.1
- old
+ new
@@ -18,12 +18,14 @@
config_set_default :include_tag_key, false
include SetTimeKeyMixin
config_set_default :include_time_key, true
+ desc "MongoDB connection string"
+ config_param :connection_string, :default => nil
desc "MongoDB database"
- config_param :database, :string
+ config_param :database, :string, :default => nil
desc "MongoDB collection"
config_param :collection, :string, default: 'untagged'
desc "MongoDB host"
config_param :host, :string, default: 'localhost'
desc "MongoDB port"
@@ -57,10 +59,11 @@
super
require 'mongo'
require 'msgpack'
+ @nodes = nil
@client_options = {}
@collection_options = {capped: false}
end
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
@@ -88,10 +91,14 @@
end
end
super
+ if @connection_string.nil? && @database.nil?
+ raise Fluent::ConfigError, "connection_string or database parameter is required"
+ end
+
unless @ignore_invalid_record
log.warn "Since v0.8, invalid record detection will be removed because mongo driver v2.x and API spec don't provide it. You may lose invalid records, so you should not send such records to mongo plugin"
end
if conf.has_key?('tag_mapped')
@@ -119,10 +126,11 @@
@client_options[:ssl_key] = @ssl_key
@client_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase
@client_options[:ssl_verify] = @ssl_verify
@client_options[:ssl_ca_cert] = @ssl_ca_cert
end
+ @nodes = ["#{@host}:#{@port}"] if @nodes.nil?
# MongoDB uses BSON's Date for time.
def @timef.format_nocache(time)
time
end
@@ -161,13 +169,17 @@
end
private
def client
- @client_options[:database] = @database
- @client_options[:user] = @user if @user
- @client_options[:password] = @password if @password
- Mongo::Client.new(["#{@host}:#{@port}"], @client_options)
+ if @connection_string
+ Mongo::Client.new(@connection_string)
+ else
+ @client_options[:database] = @database
+ @client_options[:user] = @user if @user
+ @client_options[:password] = @password if @password
+ Mongo::Client.new(@nodes, @client_options)
+ end
end
def collect_records(chunk)
records = []
chunk.msgpack_each {|time, record|