lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.12 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.13

- old
+ new

@@ -18,10 +18,11 @@ config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 27017 config_param :ignore_invalid_record, :bool, :default => false config_param :disable_collection_check, :bool, :default => nil config_param :safe, :bool, :default => true + config_param :exclude_broken_fields, :string, :default => nil # tag mapping mode config_param :tag_mapped, :bool, :default => false config_param :remove_tag_prefix, :string, :default => nil @@ -50,10 +51,12 @@ if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end + @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields + # capped configuration if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') @collection_options[:capped] = true @collection_options[:size] = Config.size_value(conf['capped_size']) @@ -134,9 +137,14 @@ def operate_invalid_records(collection, records) converted_records = records.map { |record| new_record = {} new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key new_record[@time_key] = record.delete(@time_key) + if @exclude_broken_fields + @exclude_broken_fields.each { |key| + new_record[key] = record.delete(key) + } + end new_record[BROKEN_DATA_KEY] = BSON::Binary.new(Marshal.dump(record)) new_record } collection.insert(converted_records) # Should create another collection like name_broken? end