require 'fluent/input' module Fluent class MongoTailInput < Input Plugin.register_input('mongo_tail', self) require 'fluent/plugin/mongo_util' include MongoUtil config_param :database, :string, :default => nil config_param :collection, :string config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 27017 config_param :wait_time, :integer, :default => 1 config_param :url, :string, :default => nil config_param :tag, :string, :default => nil config_param :tag_key, :string, :default => nil config_param :time_key, :string, :default => nil config_param :time_format, :string, :default => nil # To store last ObjectID config_param :id_store_file, :string, :default => nil config_param :id_store_collection, :string, :default => nil # SSL connection config_param :ssl, :bool, :default => false unless method_defined?(:log) define_method(:log) { $log } end def initialize super require 'mongo' require 'bson' @connection_options = {} end def configure(conf) super if !@tag and !@tag_key raise ConfigError, "'tag' or 'tag_key' option is required on mongo_tail input" end if @database && @url raise ConfigError, "Both 'database' and 'url' can not be set" end if !@database && !@url raise ConfigError, "One of 'database' or 'url' must be specified" end @last_id = get_last_id @connection_options[:ssl] = @ssl $log.debug "Setup mongo_tail configuration: mode = #{@id_store_file || @id_store_collection ? 'persistent' : 'non-persistent'}, last_id = #{@last_id}" end def start super open_id_storage @client = get_capped_collection @thread = Thread.new(&method(:run)) end def shutdown save_last_id(@last_id) unless @last_id close_id_storage @stop = true @thread.join @client.db.connection.close super end def run loop { cursor = Mongo::Cursor.new(@client, cursor_conf) begin loop { return if @stop cursor = Mongo::Cursor.new(@client, cursor_conf) unless cursor.alive? if doc = cursor.next_document process_document(doc) else sleep @wait_time end } rescue # ignore Mongo::OperationFailuer at CURSOR_NOT_FOUND end } end private def get_capped_collection begin db = get_database raise ConfigError, "'#{database_name}.#{@collection}' not found: node = #{node_string}" unless db.collection_names.include?(@collection) collection = db.collection(@collection) raise ConfigError, "'#{database_name}.#{@collection}' is not capped: node = #{node_string}" unless collection.capped? collection rescue Mongo::ConnectionFailure => e log.fatal "Failed to connect to 'mongod'. Please restart 'fluentd' after 'mongod' started: #{e}" exit! rescue Mongo::OperationFailure => e log.fatal "Operation failed. Probably, 'mongod' needs an authentication: #{e}" exit! end end def get_database case when @database authenticate(Mongo::Connection.new(@host, @port, @connection_options).db(@database)) when @url parser = Mongo::URIParser.new(@url) parser.connection.db(parser.db_name) end end def database_name case when @database @database when @url Mongo::URIParser.new(@url).db_name end end def node_string case when @database "#{@host}:#{@port}" when @url @url end end def process_document(doc) time = if @time_key t = doc.delete(@time_key) t.nil? ? Engine.now : t.to_i else Engine.now end tag = if @tag_key t = doc.delete(@tag_key) t.nil? ? 'mongo.missing_tag' : t else @tag end if id = doc.delete('_id') @last_id = id.to_s doc['_id_str'] = @last_id save_last_id(@last_id) end # Should use MultiEventStream? router.emit(tag, time, doc) end def cursor_conf conf = {} conf[:tailable] = true conf[:selector] = {'_id' => {'$gt' => BSON::ObjectId(@last_id)}} if @last_id conf end # following methods are used to read/write last_id def open_id_storage if @id_store_file @id_storage = File.open(@id_store_file, 'w') @id_storege.sync end if @id_store_collection @id_storage = get_database.collection(@id_store_collection) end end def close_id_storage if @id_storage.is_a?(File) @id_storage.close end end def get_last_id begin if @id_store_file && File.exist?(@id_store_file) return BSON::ObjectId(File.read(@id_store_file)).to_s end if @id_store_collection collection = get_database.collection(@id_store_collection) count = collection.find.count doc = collection.find.skip(count - 1).limit(1).first return doc && doc["last_id"] end rescue nil end end def save_last_id(last_id) if @id_storage.is_a?(File) @id_storage.pos = 0 @id_storage.write(last_id) end if @id_storage.is_a?(Mongo::Collection) @id_storage.insert("last_id" => last_id) end end end end