lib/logstash/test.rb in logstash-input-mongoprofile-0.1.15 vs lib/logstash/test.rb in logstash-input-mongoprofile-0.1.16
- old
+ new
@@ -1,3 +1,176 @@
-require('date')
-a = DateTime.parse("2017-06-22 08:14:12 UTC\n")
-puts a
\ No newline at end of file
+require 'mongo'
+
+class Test
+ def each
+ arr = [1, 2, 3]
+
+ arr.each do |i|
+ yield i
+ end
+
+ puts 'end each'
+ end
+end
+
+test = Test.new
+
+test.each do |i|
+ puts i
+end
+
+class EventMock
+ def set(arg)
+ puts arg
+ end
+end
+
+class LoggerMock
+ def info(arg)
+ puts arg
+ end
+
+ def debug(arg)
+ puts arg
+ end
+end
+
+
+class MongoAccessor
+ def initialize(url, collection, client_host)
+ connection = Mongo::Client.new(url)
+
+ @mongodb = connection.database
+ @collection = @mongodb.collection(collection)
+ @client_host = client_host
+ end
+
+ def get_documents_by_ts(date, limit)
+ @collection.find({:ts => {:$gt => DateTime.parse(date) + 0.00002}, :client => {:$ne => @client_host}}).limit(limit).sort(:ts => 1)
+ end
+
+ def get_documents(limit)
+ @collection.find({:client => {:$ne => @client_host}}).limit(limit).sort(:ts => 1)
+ end
+end
+
+class ProfileCollection
+ def initialize(documents, parser, generate_id)
+ @generate_id = generate_id
+ @documents = []
+
+ documents.each do |document|
+ @documents.push(document)
+ puts document['ts']
+ end
+
+ @parser = parser
+ end
+
+ def each
+ @documents.each do |document|
+ if @generate_id
+ document['_id'] = generate_id.to_s
+ end
+
+ yield @parser.parse(document)
+ end
+
+ @documents = []
+ end
+
+ def get_last_document_date
+ if @documents != nil and @documents[-1] != nil
+ @documents[-1]['ts']
+ else
+ nil
+ end
+ end
+
+ def length
+ @documents.length
+ end
+
+ private
+ def generate_id
+ # noinspection RubyArgCount
+ BSON::ObjectId.new
+ end
+end
+
+class DocumentParser
+ def initialize(host, logger)
+ @host = host
+ @logger = logger
+ end
+
+ def parse(document)
+ @logger.info('Start documents parsing')
+ event = LogStash::Event.new('host' => @host)
+
+ document.each do |key, value|
+ @logger.debug("Try set event field key: #{key} value: #{value}")
+ event.set(key, value)
+ end
+
+ event
+ end
+end
+
+class LastValueStore
+ def initialize(path, name)
+ @file_full_name = "#{path}/#{name}"
+ end
+
+ def save_last_value(value)
+ file = File.open(@file_full_name, 'a+')
+
+ file.truncate(0)
+ file.puts(value)
+
+ file.close
+ end
+
+ def get_last_value
+ File.read(@file_full_name)
+ end
+end
+
+class Controller
+ def initialize(event, url, collection, limit, path, client_host, logger, generate_id)
+ @mongo_accessor = MongoAccessor.new(url, collection, client_host)
+ @last_value_store = LastValueStore.new(path, collection)
+ @document_parser = DocumentParser.new(event, logger)
+ @generate_id = generate_id
+ @limit = limit
+ @logger = logger
+ end
+
+ def get_next_events
+ last_date_value = @last_value_store.get_last_value
+
+ if last_date_value == ''
+ @logger.info('Getting documents from mongo first time')
+ documents = @mongo_accessor.get_documents(@limit)
+ else
+ @logger.info("Getting documents from mongo start at #{last_date_value}")
+ documents = @mongo_accessor.get_documents_by_ts(last_date_value, @limit)
+ end
+
+ profile_collection = ProfileCollection.new(documents, @document_parser, @generate_id)
+
+ if profile_collection.get_last_document_date != nil
+ @last_value_store.save_last_value(profile_collection.get_last_document_date)
+ else
+ @logger.info('Nothing to get...')
+ end
+
+ profile_collection
+ end
+end
+
+controller = Controller.new(EventMock.new, 'mongodb://192.168.1.37/eleet-v2-dev', 'system.profile', 10, '/home/artem/', '192.168.1.35', LoggerMock.new, false)
+
+while true
+ events = controller.get_next_events
+ puts events.length
+end