lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.3.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.4.0

- old
+ new

@@ -2,84 +2,88 @@ class MongoOutput < BufferedOutput Fluent::Plugin.register_output('mongo', self) + include SetTagKeyMixin + config_set_default :include_tag_key, false + + include SetTimeKeyMixin + config_set_default :include_time_key, true + + config_param :database, :string + config_param :collection, :string + config_param :host, :string, :default => 'localhost' + config_param :port, :integer, :default => 27017 + + attr_reader :argument + def initialize super require 'mongo' require 'msgpack' - # Sub-class can overwrite following parameters - @database_name = nil - @collection_name = nil + @argument = {:capped => false} end def configure(conf) super - @database_name = conf['database'] if conf.has_key?('database') - @collection_name = conf['collection'] if conf.has_key?('collection') - raise ConfigError, "'database' and 'collection' parameter is required on mongo output" if @database_name.nil? || @collection_name.nil? - @host, @port = host_and_port(conf) - # capped configuration - @argument = {:capped => false} - if conf['capped'] + if conf.has_key?('capped') raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size') @argument[:capped] = true @argument[:size] = Config.size_value(conf['capped_size']) @argument[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max') end + + # MongoDB uses BSON's Date for time. + def @timef.format_nocache(time) + time + end end def start super - @collection = get_or_create_collection #Mongo::Connection.new(@host, @port).db(@database_name).collection(@collection_name) + @client = get_or_create_collection end def shutdown # Mongo::Connection checks alive or closed myself - @collection.db.connection.close + @client.db.connection.close super end - def format(tag, event) - event.record.to_msgpack + def format(tag, time, record) + record.to_msgpack end def write(chunk) records = [] - chunk.open { |io| - begin - MessagePack::Unpacker.new(io).each { |record| records << record } - rescue EOFError - # EOFError always occured when reached end of chunk. - end + chunk.msgpack_each { |record| + record[@time_key] = Time.at(record[@time_key]) if @include_time_key + records << record } - - @collection.insert(records) + operate(records) end private - def host_and_port(conf) - host = conf['host'] || 'localhost' - port = conf['port'] || 27017 - [host, Integer(port)] - end - def get_or_create_collection - db = Mongo::Connection.new(@host, @port).db(@database_name) - if db.collection_names.include?(@collection_name) - collection = db.collection(@collection_name) + db = Mongo::Connection.new(@host, @port).db(@database) + if db.collection_names.include?(@collection) + collection = db.collection(@collection) return collection if @argument[:capped] == collection.capped? # TODO: Verify capped configuration # raise Exception if old collection does not match lastest configuration raise ConfigError, "New configuration is different from existing collection" end - db.create_collection(@collection_name, @argument) + db.create_collection(@collection, @argument) + end + + def operate(records) + @client.insert(records) end end end