lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.1.2 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.2.0
- old
+ new
@@ -171,20 +171,21 @@
true
end
def write(chunk)
collection_name = extract_placeholders(@collection, chunk.metadata)
- operate(format_collection_name(collection_name), collect_records(chunk))
+ database_name = extract_placeholders(@database, chunk.metadata)
+ operate(database_name, format_collection_name(collection_name), collect_records(chunk))
end
private
- def client
+ def client(database = @database)
if @connection_string
Mongo::Client.new(@connection_string)
else
- @client_options[:database] = @database
+ @client_options[:database] = database
@client_options[:user] = @user if @user
@client_options[:password] = @password if @password
Mongo::Client.new(@nodes, @client_options)
end
end
@@ -225,11 +226,12 @@
else
@client.database.collection_names.include?(name)
end
end
- def get_collection(name, options)
+ def get_collection(database, name, options)
+ @client = client(database) if database && @database != database
return @client[name] if @collections[name]
unless collection_exists?(name)
log.trace "Create collection #{name} with options #{options}"
@client[name, options].create
@@ -240,11 +242,11 @@
def forget_collection(name)
@collections.delete(name)
end
- def operate(collection, records)
+ def operate(database, collection, records)
begin
if @replace_dot_in_key_with
records.map! do |r|
replace_key_of_hash(r, ".", @replace_dot_in_key_with)
end
@@ -253,10 +255,10 @@
records.map! do |r|
replace_key_of_hash(r, /^\$/, @replace_dollar_in_key_with)
end
end
- get_collection(collection, @collection_options).insert_many(records)
+ get_collection(database, collection, @collection_options).insert_many(records)
rescue Mongo::Error::BulkWriteError => e
log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON."
forget_collection(collection)
rescue ArgumentError => e
log.warn e