require 'fluent/output' module Fluent class MongoOutput < BufferedOutput Plugin.register_output('mongo', self) require 'fluent/plugin/mongo_util' include MongoUtil include SetTagKeyMixin config_set_default :include_tag_key, false include SetTimeKeyMixin config_set_default :include_time_key, true config_param :database, :string config_param :collection, :string, :default => 'untagged' config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 27017 config_param :ignore_invalid_record, :bool, :default => false, :deprecated => "This parameter will be ignored since v0.8 because mongo driver 2.x doesn't support base functionality for this parameter" config_param :disable_collection_check, :bool, :default => nil config_param :exclude_broken_fields, :string, :default => nil config_param :write_concern, :integer, :default => nil config_param :journaled, :bool, :default => false config_param :socket_pool_size, :integer, :default => 1 config_param :replace_dot_in_key_with, :string, :default => nil config_param :replace_dollar_in_key_with, :string, :default => nil # tag mapping mode config_param :tag_mapped, :bool, :default => false config_param :remove_tag_prefix, :string, :default => nil # SSL connection config_param :ssl, :bool, :default => false config_param :ssl_cert, :string, :default => nil config_param :ssl_key, :string, :default => nil config_param :ssl_key_pass_phrase, :string, :default => nil, :secret => true config_param :ssl_verify, :bool, :default => false config_param :ssl_ca_cert, :string, :default => nil # For older (1.7 or earlier) MongoDB versions config_param :mongodb_smaller_bson_limit, :bool, :default => false attr_reader :collection_options, :connection_options unless method_defined?(:log) define_method(:log) { $log } end def initialize super require 'mongo' require 'msgpack' @clients = {} @connection_options = {} @collection_options = {:capped => false} end # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 def configure(conf) if conf.has_key?('buffer_chunk_limit') configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit']) estimated_limit_size = LIMIT_AFTER_v1_8 estimated_limit_size_conf = '8m' if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) estimated_limit_size = LIMIT_BEFORE_v1_8 estimated_limit_size_conf = '2m' end if configured_chunk_limit_size > estimated_limit_size log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}" conf['buffer_chunk_limit'] = estimated_limit_size_conf end else if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) conf['buffer_chunk_limit'] = '2m' else conf['buffer_chunk_limit'] = '8m' end end super unless @ignore_invalid_record log.warn "Since v0.8, invalid record detection will be removed because mongo driver v2.x and API spec don't provide it. You may lose invalid records, so you should not send such records to mongo plugin" end if conf.has_key?('tag_mapped') @tag_mapped = true @disable_collection_check = true if @disable_collection_check.nil? else @disable_collection_check = false if @disable_collection_check.nil? end raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @exclude_broken_fields = @exclude_broken_fields.split(',') if @exclude_broken_fields if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on of Mongo output" unless conf.has_key?('capped_size') @collection_options[:capped] = true @collection_options[:size] = Config.size_value(conf['capped_size']) @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end @connection_options[:w] = @write_concern unless @write_concern.nil? @connection_options[:j] = @journaled @connection_options[:pool_size] = @socket_pool_size @connection_options[:ssl] = @ssl if @ssl @connection_options[:ssl_cert] = @ssl_cert @connection_options[:ssl_key] = @ssl_key @connection_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase @connection_options[:ssl_verify] = @ssl_verify @connection_options[:ssl_ca_cert] = @ssl_ca_cert end # MongoDB uses BSON's Date for time. def @timef.format_nocache(time) time end $log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" end def start # Non tag mapped mode, we can check collection configuration before server start. get_or_create_collection(@collection) unless @tag_mapped super end def shutdown # Mongo::Connection checks alive or closed myself @clients.values.each { |client| client.db.connection.close } super end def format(tag, time, record) [time, record].to_msgpack end def emit(tag, es, chain) # TODO: Should replacement using eval in configure? if @tag_mapped super(tag, es, chain, tag) else super(tag, es, chain) end end def write(chunk) # TODO: See emit comment collection_name = @tag_mapped ? chunk.key : @collection operate(get_or_create_collection(collection_name), collect_records(chunk)) end private INSERT_ARGUMENT = {:collect_on_error => true} BROKEN_DATA_KEY = '__broken_data' def operate(collection, records) begin if @replace_dot_in_key_with records.map! do |r| replace_key_of_hash(r, ".", @replace_dot_in_key_with) end end if @replace_dollar_in_key_with records.map! do |r| replace_key_of_hash(r, /^\$/, @replace_dollar_in_key_with) end end record_ids, error_records = collection.insert(records, INSERT_ARGUMENT) if !@ignore_invalid_record and error_records.size > 0 operate_invalid_records(collection, error_records) end rescue Mongo::OperationFailure => e # Probably, all records of _records_ are broken... if e.error_code == 13066 # 13066 means "Message contains no documents" operate_invalid_records(collection, records) unless @ignore_invalid_record else raise e end end records end def operate_invalid_records(collection, records) converted_records = records.map { |record| new_record = {} new_record[@tag_key] = record.delete(@tag_key) if @include_tag_key new_record[@time_key] = record.delete(@time_key) if @exclude_broken_fields @exclude_broken_fields.each { |key| new_record[key] = record.delete(key) } end new_record[BROKEN_DATA_KEY] = BSON::Binary.new(Marshal.dump(record)) new_record } collection.insert(converted_records) end def collect_records(chunk) records = [] chunk.msgpack_each { |time, record| record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key records << record } records end FORMAT_COLLECTION_NAME_RE = /(^\.+)|(\.+$)/ def format_collection_name(collection_name) formatted = collection_name formatted = formatted.gsub(@remove_tag_prefix, '') if @remove_tag_prefix formatted = formatted.gsub(FORMAT_COLLECTION_NAME_RE, '') formatted = @collection if formatted.size == 0 # set default for nil tag formatted end def get_or_create_collection(collection_name) collection_name = format_collection_name(collection_name) return @clients[collection_name] if @clients[collection_name] @db ||= get_connection if @db.collection_names.include?(collection_name) collection = @db.collection(collection_name) unless @disable_collection_check capped = collection.capped? unless @collection_options[:capped] == capped # TODO: Verify capped configuration new_mode = format_collection_mode(@collection_options[:capped]) old_mode = format_collection_mode(capped) raise ConfigError, "New configuration is different from existing collection: new = #{new_mode}, old = #{old_mode}" end end else collection = @db.create_collection(collection_name, @collection_options) end @clients[collection_name] = collection end def format_collection_mode(mode) mode ? 'capped' : 'normal' end def get_connection db = Mongo::MongoClient.new(@host, @port, @connection_options).db(@database) authenticate(db) end def replace_key_of_hash(hash_or_array, pattern, replacement) case hash_or_array when Array hash_or_array.map do |elm| replace_key_of_hash(elm, pattern, replacement) end when Hash result = Hash.new hash_or_array.each_pair do |k, v| k = k.gsub(pattern, replacement) if v.is_a?(Hash) || v.is_a?(Array) result[k] = replace_key_of_hash(v, pattern, replacement) else result[k] = v end end result else hash_or_array end end end end