lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.3.1 vs lib/fluent/plugin/out_mongo.rb in fluent-plugin-mongo-0.4.0
- old
+ new
@@ -2,84 +2,88 @@
class MongoOutput < BufferedOutput
Fluent::Plugin.register_output('mongo', self)
+ include SetTagKeyMixin
+ config_set_default :include_tag_key, false
+
+ include SetTimeKeyMixin
+ config_set_default :include_time_key, true
+
+ config_param :database, :string
+ config_param :collection, :string
+ config_param :host, :string, :default => 'localhost'
+ config_param :port, :integer, :default => 27017
+
+ attr_reader :argument
+
def initialize
super
require 'mongo'
require 'msgpack'
- # Sub-class can overwrite following parameters
- @database_name = nil
- @collection_name = nil
+ @argument = {:capped => false}
end
def configure(conf)
super
- @database_name = conf['database'] if conf.has_key?('database')
- @collection_name = conf['collection'] if conf.has_key?('collection')
- raise ConfigError, "'database' and 'collection' parameter is required on mongo output" if @database_name.nil? || @collection_name.nil?
- @host, @port = host_and_port(conf)
-
# capped configuration
- @argument = {:capped => false}
- if conf['capped']
+ if conf.has_key?('capped')
raise ConfigError, "'capped_size' parameter is required on <store> of Mongo output" unless conf.has_key?('capped_size')
@argument[:capped] = true
@argument[:size] = Config.size_value(conf['capped_size'])
@argument[:max] = Config.size_value(conf['capped_max']) if conf.has_key?('capped_max')
end
+
+ # MongoDB uses BSON's Date for time.
+ def @timef.format_nocache(time)
+ time
+ end
end
def start
super
- @collection = get_or_create_collection #Mongo::Connection.new(@host, @port).db(@database_name).collection(@collection_name)
+ @client = get_or_create_collection
end
def shutdown
# Mongo::Connection checks alive or closed myself
- @collection.db.connection.close
+ @client.db.connection.close
super
end
- def format(tag, event)
- event.record.to_msgpack
+ def format(tag, time, record)
+ record.to_msgpack
end
def write(chunk)
records = []
- chunk.open { |io|
- begin
- MessagePack::Unpacker.new(io).each { |record| records << record }
- rescue EOFError
- # EOFError always occured when reached end of chunk.
- end
+ chunk.msgpack_each { |record|
+ record[@time_key] = Time.at(record[@time_key]) if @include_time_key
+ records << record
}
-
- @collection.insert(records)
+ operate(records)
end
private
- def host_and_port(conf)
- host = conf['host'] || 'localhost'
- port = conf['port'] || 27017
- [host, Integer(port)]
- end
-
def get_or_create_collection
- db = Mongo::Connection.new(@host, @port).db(@database_name)
- if db.collection_names.include?(@collection_name)
- collection = db.collection(@collection_name)
+ db = Mongo::Connection.new(@host, @port).db(@database)
+ if db.collection_names.include?(@collection)
+ collection = db.collection(@collection)
return collection if @argument[:capped] == collection.capped? # TODO: Verify capped configuration
# raise Exception if old collection does not match lastest configuration
raise ConfigError, "New configuration is different from existing collection"
end
- db.create_collection(@collection_name, @argument)
+ db.create_collection(@collection, @argument)
+ end
+
+ def operate(records)
+ @client.insert(records)
end
end
end