lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.0.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.1.0

- old
+ new

@@ -152,10 +152,11 @@ end def start @client = client @client = authenticate(@client) + @collections = {} super end def shutdown @client.close @@ -200,11 +201,11 @@ end end def collect_records(chunk) records = [] - time_key = @inject_config.time_key + time_key = @inject_config.time_key if @inject_config tag = chunk.metadata.tag chunk.msgpack_each {|time, record| record = inject_values_to_record(tag, time, record) # MongoDB uses BSON's Date for time. record[time_key] = Time.at(time || record[time_key]) if time_key @@ -221,10 +222,40 @@ formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '') formatted = @collection if formatted.size == 0 # set default for nil tag formatted end + def list_collections_enabled? + @client.cluster.next_primary(false).features.list_collections_enabled? + end + + def collection_exists?(name) + if list_collections_enabled? + r = @client.database.command( + { :listCollections => 1, :filter => { :name => name } } + ).first + r[:ok] && r[:cursor][:firstBatch].size == 1 + else + @client.database.collection_names.include?(name) + end + end + + def get_collection(name, options) + return @client[name] if @collections[name] + + unless collection_exists?(name) + log.trace "Create collection #{name} with options #{options}" + @client[name, options].create + end + @collections[name] = true + @client[name] + end + + def forget_collection(name) + @collections.delete(name) + end + def operate(collection, records) begin if @replace_dot_in_key_with records.map! do |r| replace_key_of_hash(r, ".", @replace_dot_in_key_with) @@ -234,12 +265,13 @@ records.map! do |r| replace_key_of_hash(r, /^\$/, @replace_dollar_in_key_with) end end - @client[collection, @collection_options].insert_many(records) + get_collection(collection, @collection_options).insert_many(records) rescue Mongo::Error::BulkWriteError => e log.warn "#{records.size - e.result["n_inserted"]} documents are not inserted. Maybe these documents are invalid as a BSON." + forget_collection(collection) rescue ArgumentError => e log.warn e end records end