lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.12 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.13
- old
+ new
@@ -18,10 +18,11 @@
config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 27017
config_param :ignore_invalid_record, :bool, :default => false
config_param :disable_collection_check, :bool, :default => nil
config_param :safe, :bool, :default => true
+ config_param :exclude_broken_fields, :string, :default => nil
# tag mapping mode
config_param :tag_mapped, :bool, :default => false
config_param :remove_tag_prefix, :string, :default => nil
@@ -50,10 +51,12 @@
if remove_tag_prefix = conf['remove_tag_prefix']
@remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix))
end
+ @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields
+
# capped configuration
if conf.has_key?('capped')
raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
@collection_options[:capped] = true
@collection_options[:size] = Config.size_value(conf['capped_size'])
@@ -134,9 +137,14 @@
def operate_invalid_records(collection, records)
converted_records = records.map { |record|
new_record = {}
new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key
new_record[@time_key] = record.delete(@time_key)
+ if @exclude_broken_fields
+ @exclude_broken_fields.each { |key|
+ new_record[key] = record.delete(key)
+ }
+ end
new_record[BROKEN_DATA_KEY] = BSON::Binary.new(Marshal.dump(record))
new_record
}
collection.insert(converted_records) # Should create another collection like name_broken?
end