lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.6.2
- old
+ new
@@ -13,25 +13,27 @@
config_param :database, :string
config_param :collection, :string, :default => 'untagged'
config_param :host, :string, :default => 'localhost'
config_param :port, :integer, :default => 27017
config_param :ignore_invalid_record, :bool, :default => false
+ config_param :safe, :bool, :default => true
# tag mapping mode
config_param :tag_mapped, :bool, :default => false
config_param :remove_tag_prefix, :string, :default => nil
- attr_reader :argument
+ attr_reader :collection_options
def initialize
super
require 'mongo'
require 'fluent/plugin/mongo_ext'
require 'msgpack'
@clients = {}
- @argument = {:capped => false}
+ @connection_options = {}
+ @collection_options = {:capped => false}
end
def configure(conf)
super
@@ -43,21 +45,23 @@
end
# capped configuration
if conf.has_key?('capped')
raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
- @argument[:capped] = true
- @argument[:size] = Config.size_value(conf['capped_size'])
- @argument[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
+ @collection_options[:capped] = true
+ @collection_options[:size] = Config.size_value(conf['capped_size'])
+ @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
end
if @buffer.respond_to?(:buffer_chunk_limit)
@buffer.buffer_chunk_limit = available_buffer_chunk_limit
else
$log.warn "#{Fluent::VERSION} does not have :buffer_chunk_limit. Be careful when insert large documents to MongoDB"
end
+ @connection_options[:safe] = @safe
+
# MongoDB uses BSON's Date for time.
def @timef.format_nocache(time)
time
end
@@ -88,39 +92,46 @@
end
def write(chunk)
# TODO: See emit comment
collection_name = @tag_mapped ? chunk.key : @collection
- operate(collection_name, collect_records(chunk))
+ operate(get_or_create_collection(collection_name), collect_records(chunk))
end
private
INSERT_ARGUMENT = {:collect_on_error => true}
BROKEN_DATA_KEY = '__broken_data'
- def operate(collection_name, records)
- collection = get_or_create_collection(collection_name)
- record_ids, error_records = collection.insert(records, INSERT_ARGUMENT)
- if error_records
- if @ignore_invalid_record
- $log.warn "Ignore #{error_records.size} documents"
+ def operate(collection, records)
+ begin
+ record_ids, error_records = collection.insert(records, INSERT_ARGUMENT)
+ if !@ignore_invalid_record and error_records.size > 0
+ operate_invalid_records(collection, error_records)
+ end
+ rescue Mongo::OperationFailure => e
+ # Probably, all records of _records_ are broken...
+ if e.error_code == 13066 # 13066 means "Message contains no documents"
+ operate_invalid_records(collection, records) unless @ignore_invalid_record
else
- # Should create another collection like name_broken?
- converted_records = error_records.map { |record|
- new_record = {}
- new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key
- new_record[@time_key] = record.delete(@time_key)
- new_record[BROKEN_DATA_KEY] = Marshal.dump(record) # Should use BSON::ByteBuffer
- new_record
- }
- collection.insert(converted_records)
+ raise e
end
end
records
end
+ def operate_invalid_records(collection, records)
+ converted_records = records.map { |record|
+ new_record = {}
+ new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key
+ new_record[@time_key] = record.delete(@time_key)
+ new_record[BROKEN_DATA_KEY] = Marshal.dump(record) # Should use BSON::ByteBuffer
+ new_record
+ }
+ collection.insert(converted_records) # Should create another collection like name_broken?
+ end
+
def collect_records(chunk)
records = []
chunk.msgpack_each { |time, record|
record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key
records << record
@@ -143,22 +154,22 @@
return @clients[collection_name] if @clients[collection_name]
@db ||= get_connection
if @db.collection_names.include?(collection_name)
collection = @db.collection(collection_name)
- unless @argument[:capped] == collection.capped? # TODO: Verify capped configuration
+ unless @collection_options[:capped] == collection.capped? # TODO: Verify capped configuration
# raise Exception if old collection does not match lastest configuration
raise ConfigError, "New configuration is different from existing collection"
end
else
- collection = @db.create_collection(collection_name, @argument)
+ collection = @db.create_collection(collection_name, @collection_options)
end
@clients[collection_name] = collection
end
def get_connection
- Mongo::Connection.new(@host, @port).db(@database)
+ Mongo::Connection.new(@host, @port, @connection_options).db(@database)
end
# Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON.
LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2
LIMIT_AFTER_v1_8 = 10 * 1024 * 1024 # 10MB = 16MB / 2 + alpha