lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.2

- old
+ new

@@ -13,25 +13,27 @@ config_param :database, :string config_param :collection, :string, :default => 'untagged' config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 27017 config_param :ignore_invalid_record, :bool, :default => false + config_param :safe, :bool, :default => true # tag mapping mode config_param :tag_mapped, :bool, :default => false config_param :remove_tag_prefix, :string, :default => nil - attr_reader :argument + attr_reader :collection_options def initialize super require 'mongo' require 'fluent/plugin/mongo_ext' require 'msgpack' @clients = {} - @argument = {:capped => false} + @connection_options = {} + @collection_options = {:capped => false} end def configure(conf) super @@ -43,21 +45,23 @@ end # capped configuration if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') - @argument[:capped] = true - @argument[:size] = Config.size_value(conf['capped_size']) - @argument[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') + @collection_options[:capped] = true + @collection_options[:size] = Config.size_value(conf['capped_size']) + @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end if @buffer.respond_to?(:buffer_chunk_limit) @buffer.buffer_chunk_limit = available_buffer_chunk_limit else $log.warn "#{Fluent::VERSION} does not have :buffer_chunk_limit. Be careful when insert large documents to MongoDB" end + @connection_options[:safe] = @safe + # MongoDB uses BSON's Date for time. def @timef.format_nocache(time) time end @@ -88,39 +92,46 @@ end def write(chunk) # TODO: See emit comment collection_name = @tag_mapped ? chunk.key : @collection - operate(collection_name, collect_records(chunk)) + operate(get_or_create_collection(collection_name), collect_records(chunk)) end private INSERT_ARGUMENT = {:collect_on_error => true} BROKEN_DATA_KEY = '__broken_data' - def operate(collection_name, records) - collection = get_or_create_collection(collection_name) - record_ids, error_records = collection.insert(records, INSERT_ARGUMENT) - if error_records - if @ignore_invalid_record - $log.warn "Ignore #{error_records.size} documents" + def operate(collection, records) + begin + record_ids, error_records = collection.insert(records, INSERT_ARGUMENT) + if !@ignore_invalid_record and error_records.size > 0 + operate_invalid_records(collection, error_records) + end + rescue Mongo::OperationFailure => e + # Probably, all records of _records_ are broken... + if e.error_code == 13066 # 13066 means "Message contains no documents" + operate_invalid_records(collection, records) unless @ignore_invalid_record else - # Should create another collection like name_broken? - converted_records = error_records.map { |record| - new_record = {} - new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key - new_record[@time_key] = record.delete(@time_key) - new_record[BROKEN_DATA_KEY] = Marshal.dump(record) # Should use BSON::ByteBuffer - new_record - } - collection.insert(converted_records) + raise e end end records end + def operate_invalid_records(collection, records) + converted_records = records.map { |record| + new_record = {} + new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key + new_record[@time_key] = record.delete(@time_key) + new_record[BROKEN_DATA_KEY] = Marshal.dump(record) # Should use BSON::ByteBuffer + new_record + } + collection.insert(converted_records) # Should create another collection like name_broken? + end + def collect_records(chunk) records = [] chunk.msgpack_each { |time, record| record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key records << record @@ -143,22 +154,22 @@ return @clients[collection_name] if @clients[collection_name] @db ||= get_connection if @db.collection_names.include?(collection_name) collection = @db.collection(collection_name) - unless @argument[:capped] == collection.capped? # TODO: Verify capped configuration + unless @collection_options[:capped] == collection.capped? # TODO: Verify capped configuration # raise Exception if old collection does not match lastest configuration raise ConfigError, "New configuration is different from existing collection" end else - collection = @db.create_collection(collection_name, @argument) + collection = @db.create_collection(collection_name, @collection_options) end @clients[collection_name] = collection end def get_connection - Mongo::Connection.new(@host, @port).db(@database) + Mongo::Connection.new(@host, @port, @connection_options).db(@database) end # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 LIMIT_AFTER_v1_8 = 10 * 1024 * 1024 # 10MB = 16MB / 2 + alpha