lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.0.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.1.0
- old
+ new
@@ -152,10 +152,11 @@
end
def start
@client = client
@client = authenticate(@client)
+ @collections = {}
super
end
def shutdown
@client.close
@@ -200,11 +201,11 @@
end
end
def collect_records(chunk)
records = []
- time_key = @inject_config.time_key
+ time_key = @inject_config.time_key if @inject_config
tag = chunk.metadata.tag
chunk.msgpack_each {|time, record|
record = inject_values_to_record(tag, time, record)
# MongoDB uses BSON's Date for time.
record[time_key] = Time.at(time || record[time_key]) if time_key
@@ -221,10 +222,40 @@
formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '')
formatted = @collection if formatted.size == 0 # set default for nil tag
formatted
end
+ def list_collections_enabled?
+ @client.cluster.next_primary(false).features.list_collections_enabled?
+ end
+
+ def collection_exists?(name)
+ if list_collections_enabled?
+ r = @client.database.command(
+ { :listCollections => 1, :filter => { :name => name } }
+ ).first
+ r[:ok] && r[:cursor][:firstBatch].size == 1
+ else
+ @client.database.collection_names.include?(name)
+ end
+ end
+
+ def get_collection(name, options)
+ return @client[name] if @collections[name]
+
+ unless collection_exists?(name)
+ log.trace "Create collection #{name} with options #{options}"
+ @client[name, options].create
+ end
+ @collections[name] = true
+ @client[name]
+ end
+
+ def forget_collection(name)
+ @collections.delete(name)
+ end
+
def operate(collection, records)
begin
if @replace_dot_in_key_with
records.map! do |r|
replace_key_of_hash(r, ".", @replace_dot_in_key_with)
@@ -234,12 +265,13 @@
records.map! do |r|
replace_key_of_hash(r, /^\$/, @replace_dollar_in_key_with)
end
end
- @client[collection, @collection_options].insert_many(records)
+ get_collection(collection, @collection_options).insert_many(records)
rescue Mongo::Error::BulkWriteError => e
log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON."
+ forget_collection(collection)
rescue ArgumentError => e
log.warn e
end
records
end