lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.8.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-1.0.0.rc1

- old
+ new

@@ -1,31 +1,28 @@ -require 'fluent/output' +require 'mongo' +require 'msgpack' +require 'fluent/plugin/output' +require 'fluent/plugin/mongo_auth' +require 'fluent/plugin/logger_support' -module Fluent - class MongoOutput < BufferedOutput - Plugin.register_output('mongo', self) +module Fluent::Plugin + class MongoOutput < Output + Fluent::Plugin.register_output('mongo', self) - unless method_defined?(:log) - define_method(:log) { $log } - end + helpers :event_emitter, :inject, :compat_parameters - require 'fluent/plugin/mongo_auth' - include MongoAuthParams - include MongoAuth - require 'fluent/plugin/logger_support' - include LoggerSupport + include Fluent::MongoAuthParams + include Fluent::MongoAuth + include Fluent::LoggerSupport - include SetTagKeyMixin - config_set_default :include_tag_key, false + DEFAULT_BUFFER_TYPE = "memory" - include SetTimeKeyMixin + config_set_default :include_tag_key, false config_set_default :include_time_key, true - desc "MongoDB connection string" - config_param :connection_string, :default => nil desc "MongoDB database" - config_param :database, :string, :default => nil + config_param :database, :string desc "MongoDB collection" config_param :collection, :string, default: 'untagged' desc "MongoDB host" config_param :host, :string, default: 'localhost' desc "MongoDB port" @@ -51,68 +48,67 @@ config_param :ssl_key, :string, default: nil config_param :ssl_key_pass_phrase, :string, default: nil, secret: true config_param :ssl_verify, :bool, default: false config_param :ssl_ca_cert, :string, default: nil + config_section :buffer do + config_set_default :@type, DEFAULT_BUFFER_TYPE + config_set_default :chunk_keys, ['tag'] + end + attr_reader :client_options, :collection_options def initialize super - require 'mongo' - require 'msgpack' - - @nodes = nil @client_options = {} @collection_options = {capped: false} end # Following limits are heuristic. BSON is sometimes bigger than MessagePack and JSON. LIMIT_BEFORE_v1_8 = 2 * 1024 * 1024 # 2MB = 4MB / 2 LIMIT_AFTER_v1_8 = 8 * 1024 * 1024 # 8MB = 16MB / 2 def configure(conf) if conf.has_key?('buffer_chunk_limit') - configured_chunk_limit_size = Config.size_value(conf['buffer_chunk_limit']) + configured_chunk_limit_size = Fluent::Config.size_value(conf['buffer_chunk_limit']) estimated_limit_size = LIMIT_AFTER_v1_8 estimated_limit_size_conf = '8m' - if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) + if conf.has_key?('mongodb_smaller_bson_limit') && Fluent::Config.bool_value(conf['mongodb_smaller_bson_limit']) estimated_limit_size = LIMIT_BEFORE_v1_8 estimated_limit_size_conf = '2m' end if configured_chunk_limit_size > estimated_limit_size log.warn ":buffer_chunk_limit(#{conf['buffer_chunk_limit']}) is large. Reset :buffer_chunk_limit with #{estimated_limit_size_conf}" conf['buffer_chunk_limit'] = estimated_limit_size_conf end else - if conf.has_key?('mongodb_smaller_bson_limit') && Config.bool_value(conf['mongodb_smaller_bson_limit']) + if conf.has_key?('mongodb_smaller_bson_limit') && Fluent::Config.bool_value(conf['mongodb_smaller_bson_limit']) conf['buffer_chunk_limit'] = '2m' else conf['buffer_chunk_limit'] = '8m' end end + compat_parameters_convert(conf, :inject) super - if @connection_string.nil? && @database.nil? - raise Fluent::ConfigError, "connection_string or database parameter is required" - end - unless @ignore_invalid_record log.warn "Since v0.8, invalid record detection will be removed because mongo driver v2.x and API spec don't provide it. You may lose invalid records, so you should not send such records to mongo plugin" end if conf.has_key?('tag_mapped') - @tag_mapped = true + log.warn "'tag_mapped' feature is replaced with built-in config placeholder. Please consider to use 'collection ${tag}'." + @collection = '${tag}' end - raise ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') + raise Fluent::ConfigError, "normal mode requires collection parameter" if !@tag_mapped and !conf.has_key?('collection') if conf.has_key?('capped') - raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') + raise Fluent::ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') @collection_options[:capped] = true - @collection_options[:size] = Config.size_value(conf['capped_size']) - @collection_options[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') + @collection_options[:size] = Fluent::Config.size_value(conf['capped_size']) + @collection_options[:max] = Fluent::Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end if remove_tag_prefix = conf['remove_tag_prefix'] @remove_tag_prefix = Regexp.new('^' + Regexp.escape(remove_tag_prefix)) end @@ -126,17 +122,11 @@ @client_options[:ssl_key] = @ssl_key @client_options[:ssl_key_pass_phrase] = @ssl_key_pass_phrase @client_options[:ssl_verify] = @ssl_verify @client_options[:ssl_ca_cert] = @ssl_ca_cert end - @nodes = ["#{@host}:#{@port}"] if @nodes.nil? - # MongoDB uses BSON's Date for time. - def @timef.format_nocache(time) - time - end - configure_logger(@mongo_log_level) log.debug "Setup mongo configuration: mode = #{@tag_mapped ? 'tag mapped' : 'normal'}" end @@ -161,31 +151,39 @@ def format(tag, time, record) [time, record].to_msgpack end + def formatted_to_msgpack_binary + true + end + + def multi_workers_ready? + true + end + def write(chunk) - collection_name = @tag_mapped ? chunk.key : @collection + collection_name = extract_placeholders(@collection, chunk.metadata) operate(format_collection_name(collection_name), collect_records(chunk)) end private def client - if @connection_string - Mongo::Client.new(@connection_string) - else - @client_options[:database] = @database - @client_options[:user] = @user if @user - @client_options[:password] = @password if @password - Mongo::Client.new(@nodes, @client_options) - end + @client_options[:database] = @database + @client_options[:user] = @user if @user + @client_options[:password] = @password if @password + Mongo::Client.new(["#{@host}:#{@port}"], @client_options) end def collect_records(chunk) records = [] + time_key = @inject_config.time_key + tag = chunk.metadata.tag chunk.msgpack_each {|time, record| - record[@time_key] = Time.at(time || record[@time_key]) if @include_time_key + record = inject_values_to_record(tag, time, record) + # MongoDB uses BSON's Date for time. + record[time_key] = Time.at(time || record[time_key]) if time_key records << record } records end