require 'fluent/input' module Fluent class MongoTailInput < Input Plugin.register_input('mongo_tail', self) unless method_defined?(:log) define_method(:log) { $log } end # Define `router` method of v0.12 to support v0.10 or earlier unless method_defined?(:router) define_method("router") { ::Fluent::Engine } end require 'fluent/plugin/mongo_auth' include MongoAuthParams include MongoAuth require 'fluent/plugin/logger_support' include LoggerSupport desc "MongoDB database" config_param :database, :string, default: nil desc "MongoDB collection" config_param :collection, :string desc "MongoDB host" config_param :host, :string, default: 'localhost' desc "MongoDB port" config_param :port, :integer, default: 27017 desc "Tailing interval" config_param :wait_time, :integer, default: 1 desc "MongoDB node URL" config_param :url, :string, default: nil desc "Input tag" config_param :tag, :string, default: nil desc "Treat key as tag" config_param :tag_key, :string, default: nil desc "Treat key as time" config_param :time_key, :string, default: nil desc "Time format" config_param :time_format, :string, default: nil config_param :object_id_keys, :array, default: nil desc "To store last ObjectID" config_param :id_store_file, :string, default: nil desc "SSL connection" config_param :ssl, :bool, default: false def initialize super require 'mongo' require 'bson' @client_options = {} @connection_options = {} end def configure(conf) super if !@tag and !@tag_key raise ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input" end if @database && @url raise ConfigError, "Both 'database' and 'url' can not be set" end if !@database && !@url raise ConfigError, "One of 'database' or 'url' must be specified" end @last_id = @id_store_file ? get_last_id : nil @connection_options[:ssl] = @ssl configure_logger(@mongo_log_level) end def start super @file = get_id_store_file if @id_store_file @collection = get_collection # Resume tailing from last inserted id. # Because tailable option is obsoleted since mongo driver 2.0. @last_id = get_last_inserted_id if !@id_store_file and get_last_inserted_id @thread = Thread.new(&method(:run)) end def shutdown if @id_store_file save_last_id @file.close end @stop = true @thread.join @client.close super end def run loop { option = {} begin loop { return if @stop option['_id'] = {'$gt' => BSON::ObjectId(@last_id)} if @last_id documents = @collection.find(option) if documents.count >= 1 process_documents(documents) else sleep @wait_time end } rescue # ignore Exceptions end } end private def client @client_options[:database] = @database @client_options[:user] = @user if @user @client_options[:password] = @password if @password Mongo::Client.new(["#{node_string}"], @client_options) end def get_collection @client = client @client = authenticate(@client) @client["#{@collection}"] end def node_string case when @database "#{@host}:#{@port}" when @url @url end end def process_documents(documents) es = MultiEventStream.new documents.each {|doc| time = if @time_key t = doc.delete(@time_key) t.nil? ? Engine.now : t.to_i else Engine.now end @tag = if @tag_key t = doc.delete(@tag_key) t.nil? ? 'mongo.missing_tag' : t else @tag end if @object_id_keys @object_id_keys.each {|id_key| doc[id_key] = doc[id_key].to_s } end if id = doc.delete('_id') @last_id = id.to_s doc['_id_str'] = @last_id save_last_id if @id_store_file end es.add(time, doc) } router.emit_stream(@tag, es) end def get_last_inserted_id last_inserted_id = nil documents = @collection.find() if documents.count >= 1 documents.each {|doc| if id = doc.delete('_id') last_inserted_id = id end } end last_inserted_id end def get_id_store_file file = File.open(@id_store_file, 'w') file.sync file end def get_last_id if File.exist?(@id_store_file) BSON::ObjectId(File.read(@id_store_file)).to_s rescue nil else nil end end def save_last_id @file.pos = 0 @file.write(@last_id) end end end