lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.7.0 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.7.1

- old
+ new

@@ -1,231 +1,235 @@ module Fluent + class MongoOutput < BufferedOutput + Plugin.register_output('mongo', self) + require 'fluent/plugin/mongo_util' + include MongoUtil -class MongoOutput < BufferedOutput - Fluent::Plugin.register_output('mongo', self) + include SetTagKeyMixin + config_set_default :include_tag_key, false - require 'fluent/plugin/mongo_util' - include MongoUtil + include SetTimeKeyMixin + config_set_default :include_time_key, true - include SetTagKeyMixin - config_set_default :include_tag_key, false + config_param :database, :string + config_param :collection, :string, :default => 'untagged' + config_param :host, :string, :default => 'localhost' + config_param :port, :integer, :default => 27017 + config_param :ignore_invalid_record, :bool, :default => false + config_param :disable_collection_check, :bool, :default => nil + config_param :exclude_broken_fields, :string, :default => nil + config_param :write_concern, :integer, :default => nil - include SetTimeKeyMixin - config_set_default :include_time_key, true + # tag mapping mode + config_param :tag_mapped, :bool, :default => false + config_param :remove_tag_prefix, :string, :default => nil - config_param :database, :string - config_param :collection, :string, :default => 'untagged' - config_param :host, :string, :default => 'localhost' - config_param :port, :integer, :default => 27017 - config_param :ignore_invalid_record, :bool, :default => false - config_param :disable_collection_check, :bool, :default => nil - config_param :exclude_broken_fields, :string, :default => nil - config_param :write_concern, :integer, :default => nil + attr_reader :collection_options, :connection_options - # tag mapping mode - config_param :tag_mapped, :bool, :default => false - config_param :remove_tag_prefix, :string, :default => nil + def initialize + super + require 'mongo' + require 'msgpack' - attr_reader :collection_options, :connection_options + @clients = {} + @connection_options = {} + @collection_options = {:capped => false} + end - def initialize - super - require 'mongo' - require 'msgpack' + def configure(conf) + super - @clients = {} - @connection_options = {} - @collection_options = {:capped => false} - end + if conf.has_key?('tag_mapped') + @tag_mapped = true + @disable_collection_check = true if @disable_collection_check.nil? + else + @disable_collection_check = false if @disable_collection_check.nil? + end + raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') - def configure(conf) - super + if remove_tag_prefix = conf['remove_tag_prefix'] + @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) + end - if conf.has_key?('tag_mapped') - @tag_mapped = true - @disable_collection_check = true if @disable_collection_check.nil? - else - @disable_collection_check = false if @disable_collection_check.nil? - end - raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') + @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields - if remove_tag_prefix = conf['remove_tag_prefix'] - @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) - end + if conf.has_key?('capped') + raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') + @collection_options[:capped] = true + @collection_options[:size] = Config.size_value(conf['capped_size']) + @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') + end - @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields + @connection_options[:w] = @write_concern unless @write_concern.nil? - if conf.has_key?('capped') - raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') - @collection_options[:capped] = true - @collection_options[:size] = Config.size_value(conf['capped_size']) - @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') - end + # MongoDB uses BSON's Date for time. + def @timef.format_nocache(time) + time + end - @connection_options[:w] = @write_concern unless @write_concern.nil? - - # MongoDB uses BSON's Date for time. - def @timef.format_nocache(time) - time + $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" end - $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" - end + def start + # Non tag mapped mode, we can check collection configuration before server start. + get_or_create_collection(@collection) unless @tag_mapped - def start - # Non tag mapped mode, we can check collection configuration before server start. - get_or_create_collection(@collection) unless @tag_mapped + # From configure for avoding complex method dependency... + @buffer.buffer_chunk_limit = available_buffer_chunk_limit - # From configure for avoding complex method dependency... - @buffer.buffer_chunk_limit = available_buffer_chunk_limit + super + end - super - end + def shutdown + # Mongo::Connection checks alive or closed myself + @clients.values.each { |client| client.db.connection.close } + super + end - def shutdown - # Mongo::Connection checks alive or closed myself - @clients.values.each { |client| client.db.connection.close } - super - end + def format(tag, time, record) + [time, record].to_msgpack + end - def format(tag, time, record) - [time, record].to_msgpack - end + def emit(tag, es, chain) + # TODO: Should replacement using eval in configure? + if @tag_mapped + super(tag, es, chain, tag) + else + super(tag, es, chain) + end + end - def emit(tag, es, chain) - # TODO: Should replacement using eval in configure? - if @tag_mapped - super(tag, es, chain, tag) - else - super(tag, es, chain) + def write(chunk) + # TODO: See emit comment + collection_name = @tag_mapped ? chunk.key : @collection + operate(get_or_create_collection(collection_name), collect_records(chunk)) end - end - def write(chunk) - # TODO: See emit comment - collection_name = @tag_mapped ? chunk.key : @collection - operate(get_or_create_collection(collection_name), collect_records(chunk)) - end + private - private + INSERT_ARGUMENT = {:collect_on_error => true} + BROKEN_DATA_KEY = '__broken_data' - INSERT_ARGUMENT = {:collect_on_error => true} - BROKEN_DATA_KEY = '__broken_data' - - def operate(collection, records) - begin - record_ids, error_records = collection.insert(records, INSERT_ARGUMENT) - if !@ignore_invalid_record and error_records.size > 0 - operate_invalid_records(collection, error_records) + def operate(collection, records) + begin + record_ids, error_records = collection.insert(records, INSERT_ARGUMENT) + if !@ignore_invalid_record and error_records.size > 0 + operate_invalid_records(collection, error_records) + end + rescue Mongo::OperationFailure => e + # Probably, all records of _records_ are broken... + if e.error_code == 13066 # 13066 means "Message contains no documents" + operate_invalid_records(collection, records) unless @ignore_invalid_record + else + raise e + end end - rescue Mongo::OperationFailure => e - # Probably, all records of _records_ are broken... - if e.error_code == 13066 # 13066 means "Message contains no documents" - operate_invalid_records(collection, records) unless @ignore_invalid_record - else - raise e - end + records end - records - end - def operate_invalid_records(collection, records) - converted_records = records.map { |record| - new_record = {} - new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key - new_record[@time_key] = record.delete(@time_key) - if @exclude_broken_fields - @exclude_broken_fields.each { |key| - new_record[key] = record.delete(key) - } - end - new_record[BROKEN_DATA_KEY] = BSON::Binary.new(Marshal.dump(record)) - new_record - } - collection.insert(converted_records) - end + def operate_invalid_records(collection, records) + converted_records = records.map { |record| + new_record = {} + new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key + new_record[@time_key] = record.delete(@time_key) + if @exclude_broken_fields + @exclude_broken_fields.each { |key| + new_record[key] = record.delete(key) + } + end + new_record[BROKEN_DATA_KEY] = BSON::Binary.new(Marshal.dump(record)) + new_record + } + collection.insert(converted_records) + end - def collect_records(chunk) - records = [] - chunk.msgpack_each { |time, record| - record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key - records << record - } - records - end + def collect_records(chunk) + records = [] + chunk.msgpack_each { |time, record| + record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key + records << record + } + records + end - FORMAT_COLLECTION_NAME_RE = /(^\.+)|(\.+$)/ + FORMAT_COLLECTION_NAME_RE = /(^\.+)|(\.+$)/ - def format_collection_name(collection_name) - formatted = collection_name - formatted = formatted.gsub(@remove_tag_prefix, '') if @remove_tag_prefix - formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '') - formatted = @collection if formatted.size == 0 # set default for nil tag - formatted - end + def format_collection_name(collection_name) + formatted = collection_name + formatted = formatted.gsub(@remove_tag_prefix, '') if @remove_tag_prefix + formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '') + formatted = @collection if formatted.size == 0 # set default for nil tag + formatted + end - def get_or_create_collection(collection_name) - collection_name = format_collection_name(collection_name) - return @clients[collection_name] if @clients[collection_name] + def get_or_create_collection(collection_name) + collection_name = format_collection_name(collection_name) + return @clients[collection_name] if @clients[collection_name] - @db ||= get_connection - if @db.collection_names.include?(collection_name) - collection = @db.collection(collection_name) - unless @disable_collection_check - capped = collection.capped? - unless @collection_options[:capped] == capped # TODO: Verify capped configuration - new_mode = format_collection_mode(@collection_options[:capped]) - old_mode = format_collection_mode(capped) - raise ConfigError, "New configuration is different from existing collection: new = #{new_mode}, old = #{old_mode}" + @db ||= get_connection + if @db.collection_names.include?(collection_name) + collection = @db.collection(collection_name) + unless @disable_collection_check + capped = collection.capped? + unless @collection_options[:capped] == capped # TODO: Verify capped configuration + new_mode = format_collection_mode(@collection_options[:capped]) + old_mode = format_collection_mode(capped) + raise ConfigError, "New configuration is different from existing collection: new = #{new_mode}, old = #{old_mode}" + end end + else + collection = @db.create_collection(collection_name, @collection_options) end - else - collection = @db.create_collection(collection_name, @collection_options) + + @clients[collection_name] = collection end - @clients[collection_name] = collection - end + def format_collection_mode(mode) + mode ? 'capped' : 'normal' + end - def format_collection_mode(mode) - mode ? 'capped' : 'normal' - end + def get_connection + db = Mongo::MongoClient.new(@host, @port, @connection_options).db(@database) + authenticate(db) + end - def get_connection - db = Mongo::MongoClient.new(@host, @port, @connection_options).db(@database) - authenticate(db) - end + # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. + LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 + LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 - # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. - LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 - LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 + def available_buffer_chunk_limit + begin + limit = mongod_version >= "1.8.0" ? LIMIT_AFTER_v1_8 : LIMIT_BEFORE_v1_8 + rescue Mongo::ConnectionFailure => e + $log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}" + exit! + rescue Mongo::OperationFailure => e + $log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}" + exit! + rescue Exception => e + $log.warn "mongo unknown error #{e}, set #{LIMIT_BEFORE_v1_8} to chunk limit" + limit = LIMIT_BEFORE_v1_8 + end - def available_buffer_chunk_limit - begin - limit = mongod_version >= "1.8.0" ? LIMIT_AFTER_v1_8 : LIMIT_BEFORE_v1_8 - rescue Mongo::ConnectionFailure => e - $log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}" - exit! - rescue Mongo::OperationFailure => e - $log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}" - exit! - rescue Exception => e - $log.warn "mongo unknown error #{e}, set #{LIMIT_BEFORE_v1_8} to chunk limit" - limit = LIMIT_BEFORE_v1_8 + if @buffer.buffer_chunk_limit > limit + $log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{limit}" + limit + else + @buffer.buffer_chunk_limit + end end - if @buffer.buffer_chunk_limit > limit - $log.warn ":buffer_chunk_limit(#{@buffer.buffer_chunk_limit}) is large. Reset :buffer_chunk_limit with #{limit}" - limit - else - @buffer.buffer_chunk_limit - end - end + def mongod_version + version = nil - def mongod_version - db = get_connection - db.command('buildInfo' => 1)['version'] - end -end + begin + version = get_connection.command('buildInfo' => 1)['version'] + rescue Mongo::OperationFailure + # fallback for buggy mongod version support + version = authenticate(Mongo::MongoClient.new(@host, @port, @connection_options).db('admin')).command('buildInfo' => 1)['version'] + end - + version + end + end end