lib/logstash/inputs/mongoprofile.rb in logstash-input-mongoprofile-0.1.2 vs lib/logstash/inputs/mongoprofile.rb in logstash-input-mongoprofile-0.1.3

- old
+ new

@@ -1,11 +1,11 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "stud/interval" require "socket" # for Socket.gethostname -require '../../../lib/mongo/mongo' +require "mongo" # Generate a repeating message. # # This plugin is intented only as an example. @@ -51,5 +51,107 @@ # * close sockets (unblocking blocking reads/accepts) # * cleanup temporary files # * terminate spawned threads end end # class LogStash::Inputs::Mongoprofile + +class MongoAccessor + def initialize(url, collection, client_host) + connection = Mongo::Client.new(url) + + @mongodb = connection.database + @collection = @mongodb.collection(collection) + @client_host = client_host + end + + def get_documents_by_ts(date, limit) + @collection.find({:ts => {:$gt => date}, :client => {:$ne => @client_host}}).limit(limit) + end + + def get_documents(limit) + @collection.find({:client => {:$ne => @client_host}}).limit(limit) + end +end + +class ProfileCollection + def initialize(documents, parser) + @documents = documents + @parser = parser + end + + def each + @documents.each do |document| + document['_id'] = generate_id + yield @parser.parse(document) + end + end + + def get_last_document_date + @documents[-1]['ts'] + end + + private + def generate_id + # noinspection RubyArgCount + BSON::ObjectId.new + end +end + +class DocumentParser + def initialize(host) + @host = host + end + + def parse(document) + event = LogStash::Event.new('host' => @host) + + document.each do |key, value| + event.set(key, value) + end + + event + end +end + +class LastValueStore + def initialize(path, name) + @file_full_name = "#{path}/#{name}" + end + + def save_last_value(value) + file = File.open(@file_full_name, 'a+') + + file.truncate(0) + file.puts(value) + + file.close + end + + def get_last_value + File.read(@file_full_name) + end +end + +class Controller + def initialize(event, url, collection, limit, path, client_host) + @mongo_accessor = MongoAccessor.new(url, collection, client_host) + @last_value_store = LastValueStore.new(path, collection) + @document_parser = DocumentParser.new(event) + @limit = limit + end + + def get_next_events + last_date_value = @last_value_store.get_last_value + + if last_date_value == '' + documents = @mongo_accessor.get_documents(@limit) + else + documents = @mongo_accessor.get_documents_by_ts(last_date_value, @limit) + end + + profile_collection = ProfileCollection.new(documents, @document_parser) + + @last_value_store.save_last_value(profile_collection.get_last_document_date) + + profile_collection + end +end \ No newline at end of file