lib/agent/agent.rb in liri-0.3.1 vs lib/agent/agent.rb in liri-0.4.0
- old
+ new
@@ -14,43 +14,43 @@
def run(work_folder_path, stop = false)
setup_manager = Liri.set_setup(work_folder_path, :agent)
agent_folder_path = setup_manager.agent_folder_path
Liri.set_logger(setup_manager.logs_folder_path, 'liriagent.log')
- Liri.logger.info("Agent process started")
+ Liri.logger.info("Agent process started", true)
Liri.logger.info("Press Ctrl + c to finish Agent process manually\n", true)
- decompressed_source_code_path = File.join(agent_folder_path, '/', Common::SourceCode::DECOMPRESSED_FOLDER_NAME)
- source_code = Common::SourceCode.new(decompressed_source_code_path, agent_folder_path, Liri.compression_class, Liri.unit_test_class)
- runner = Agent::Runner.new(Liri.unit_test_class, source_code.decompressed_file_folder_path)
- tests_result = Common::TestsResult.new(agent_folder_path)
- agent = Agent.new(Liri.udp_port, Liri.tcp_port, source_code, runner, tests_result, agent_folder_path)
+ agent = Agent.new(Liri.udp_port, Liri.tcp_port, agent_folder_path)
threads = []
threads << agent.start_server_socket_to_process_manager_connection_request # Esperar y procesar la petición de conexión del Manager
- Liri.init_exit(stop, threads, 'Agent')
- Liri.logger.info("Agent process finished")
- rescue SignalException => e
- Liri.logger.info("Exception(#{e}) Agent process finished manually")
- Liri.kill(threads)
+ Liri.init_exit(stop, threads)
+ rescue SignalException
+ Liri.logger.info("Agent process finished manually", true)
+ rescue InxiCommandNotFoundError => e
+ Liri.logger.error("Exception(#{e}) Please, install inxi in your operating system", true)
+ ensure
+ # Siempre se ejecutan estos comandos, haya o no excepción
+ Liri.kill(threads) if threads && threads.any?
+ Liri.logger.info("Agent process finished", true)
end
end
- def initialize(udp_port, tcp_port, source_code, runner, tests_result, agent_folder_path)
+ def initialize(udp_port, tcp_port, agent_folder_path)
@udp_port = udp_port
@udp_socket = UDPSocket.new
@tcp_port = tcp_port
- @source_code = source_code
- @runner = runner
- @tests_result = tests_result
-
@all_tests = {}
@managers = {}
@agent_folder_path = agent_folder_path
+
+ @processing = true
+
+ @hardware_specs = hardware_specs
end
# Inicia un servidor udp que se mantiene en espera de la primera petición de conexión del Manager
def start_server_socket_to_process_manager_connection_request
# El servidor udp se ejecuta en bucle dentro de un hilo, esto permite realizar otras tareas mientras este hilo sigue esperando
@@ -58,72 +58,84 @@
Thread.new do
BasicSocket.do_not_reverse_lookup = true
begin
@udp_socket.bind('0.0.0.0', @udp_port)
rescue Errno::EADDRINUSE => e
- Liri.logger.error("Exception(#{e}) Busy UDP port #{@udp_port}")
+ Liri.logger.error("Exception(#{e}) Busy UDP port #{@udp_port}", true)
Thread.exit
end
Liri.logger.info("Waiting managers request in UDP port #{@udp_port}")
- loop do
+ while @processing
@manager_request = @udp_socket.recvfrom(1024)
manager_ip_address = @manager_request.last.last
manager_data = get_manager_data(JSON.parse(@manager_request.first))
+ # TODO: El cliente TCP debería inicicarse en otro hilo, de este modo se tendrá un cliente TCP para cada Manager
+ # y se evita que un mismo cliente procese la ejecución de varios Manager
process_manager_connection_request(manager_ip_address, manager_data)
end
end
end
# Inicia un cliente tcp para responder a la petición broadcast del Manager para que éste sepa donde enviar las pruebas
def start_client_socket_to_process_tests(manager_ip_address, manager_data)
tcp_socket = TCPSocket.open(manager_ip_address, @tcp_port)
agent_ip_address = tcp_socket.addr[2]
- tcp_socket.puts({ msg: 'get_source_code', hardware_model: get_hardware_model }.to_json)
+ tcp_socket.puts({ msg: 'get_source_code', hardware_specs: @hardware_specs }.to_json)
+ # Las siguientes variables se usan para guardar momentaneamente los resultados mientras se hace un chequeo de que
+ # el Manager siga ejecutandose o que ya no haya procesado los mismos tests ya ejecutados por otro agente
+ tests_result_file_name = ""
+ tests_result_file_path = ""
+ tests_result = {}
+
while line = tcp_socket.gets
tcp_socket_data = JSON.parse(line.chop)
msg = tcp_socket_data['msg']
if msg == 'already_connected' || msg == 'no_exist_tests' || msg == 'finish_agent'
break
end
if msg == 'proceed_get_source_code'
+ init_work_folders(manager_ip_address)
result = get_source_code(manager_ip_address, manager_data)
tcp_socket.puts({ msg: result }.to_json)
end
if msg == 'process_tests'
tests_batch = tcp_socket_data
tests = get_tests(tests_batch, manager_ip_address)
- raw_tests_result = @runner.run_tests(tests)
- tests_batch_number = tests_batch['tests_batch_number']
- tests_result_file_name = @tests_result.build_file_name(agent_ip_address, tests_batch_number)
- tests_result_file_path = @tests_result.save(tests_result_file_name, raw_tests_result)
+ compressed_file_folder_path = @managers[manager_ip_address][:compressed_file_folder_path]
+ decompressed_file_folder_path = @managers[manager_ip_address][:decompressed_file_folder_path]
+
+ runner = Agent::Runner.new(Liri.unit_test_class, decompressed_file_folder_path)
+ raw_tests_result = runner.run_tests(tests)
+ batch_num = tests_batch['batch_num']
+ tests_result = Common::TestsResult.new(compressed_file_folder_path)
+ tests_result_file_name = tests_result.build_file_name(agent_ip_address, batch_num)
+ tests_result_file_path = tests_result.save(tests_result_file_name, raw_tests_result)
+ # TODO No se debería enviar el resultado si otro agente ya lo procesó, porque osinó reemplazaría el archivo de resultados
+ # ya procesado. ACTUALIZACION: Puede que esto ya se haya arreglado
send_tests_results_file(manager_ip_address, manager_data, tests_result_file_path)
- result = { msg: 'processed_tests', tests_batch_number: tests_batch_number, tests_result_file_name: tests_result_file_name, tests_batch_keys_size: tests_batch['tests_batch_keys'].size}
- tcp_socket.puts(result.to_json) # Envía el número de lote y el nombre del archivo de resultados.
+ tests_result = { msg: 'processed_tests', batch_num: batch_num, tests_result_file_name: tests_result_file_name}
+ tcp_socket.puts(tests_result.to_json) # Envía el número de lote y el nombre del archivo de resultados.
end
end
- Liri.logger.info("Finish connection with Manager #{manager_ip_address} in TCP port: #{@tcp_port}")
tcp_socket.close
- Liri.clean_folder_content(@agent_folder_path)
-
unregister_manager(manager_ip_address)
rescue Errno::EADDRINUSE => e
Liri.logger.error("Exception(#{e}) Busy UDP port #{@udp_port}")
+ @processing = false
rescue Errno::ECONNRESET => e
tcp_socket.close
- Liri.logger.error("Exception(#{e}) Closed connection in TCP port #{@tcp_port}")
- Liri.logger.info("Finish connection with Manager #{manager_ip_address} in TCP port: #{@tcp_port}")
+ Liri.logger.error("Exception(#{e}) Closed connection in TCP port #{@tcp_port}", true)
unregister_manager(manager_ip_address)
rescue Errno::ECONNREFUSED => e
- Liri.logger.error("Exception(#{e}) Rejected connection in TCP port #{@tcp_port}")
- Liri.logger.info("Finish connection with Manager #{manager_ip_address} in TCP port: #{@tcp_port}")
+ Liri.logger.error("Exception(#{e}) Rejected connection in TCP port #{@tcp_port}", true)
unregister_manager(manager_ip_address)
end
private
@@ -138,29 +150,33 @@
end
end
def get_source_code(manager_ip_address, manager_data)
puts ''
+ compressed_file_folder_path = @managers[manager_ip_address][:compressed_file_folder_path]
+ decompressed_file_folder_path = @managers[manager_ip_address][:decompressed_file_folder_path]
+ source_code = Liri::Common::SourceCode.new(decompressed_file_folder_path,compressed_file_folder_path, "", Liri.compression_class, Liri.unit_test_class)
+
Liri::Common::Benchmarking.start(start_msg: "Getting source code. Wait... ", stdout: true) do
puts ''
Net::SCP.start(manager_ip_address, manager_data.user, password: manager_data.password) do |scp|
- scp.download!(manager_data.compressed_file_path, @source_code.compressed_file_folder_path)
+ scp.download!(manager_data.compressed_file_path, compressed_file_folder_path)
end
end
puts ''
downloaded_file_name = manager_data.compressed_file_path.split('/').last
- downloaded_file_path = File.join(@source_code.compressed_file_folder_path, '/', downloaded_file_name)
+ downloaded_file_path = File.join(compressed_file_folder_path, '/', downloaded_file_name)
Liri::Common::Benchmarking.start(start_msg: "Uncompressing source code. Wait... ", stdout: true) do
- @source_code.decompress_file(downloaded_file_path)
- @all_tests = @source_code.all_tests
+ source_code.decompress_file(downloaded_file_path)
+ @all_tests = source_code.all_tests
end
puts ''
# Se cambia temporalmente la carpeta de trabajo a la carpeta de código fuente descomprimida
- Dir.chdir(@source_code.decompressed_file_folder_path) do
+ Dir.chdir(decompressed_file_folder_path) do
# Se borra el directorio .git para evitar el siguiente error al ejecutar las pruebas: fatal: not a git repository (or any of the parent directories): .git
# Una mejor alternativa es no traer siquiera esa carpeta junto al código fuente excluyendo la carpeta .git al comprimir el código fuente.
# Por cuestiones de tiempo se procede a borrar la carpeta .git por ahora, aunque al parecer el error mostrado no afecta la ejecución del Agent
# Al realizar pruebas, el error mencionado se sigue viendo en producción así que no es seguro que este borrado de la carpeta .git solucione el problema
git_folder_path = File.join(Dir.pwd, '/.git')
@@ -233,23 +249,40 @@
# Se buscan obtienen los tests que coincidan con las claves recibidas de @all_tests = {1=>"spec/hash_spec.rb:2", 2=>"spec/hash_spec.rb:13", 3=>"spec/hash_spec.rb:24", ..., 29=>"spec/liri_spec.rb:62"}
# Se retorna un arreglo con los tests a ejecutar ["spec/liri_spec.rb:4", "spec/hash_spec.rb:5", "spec/hash_spec.rb:59", ..., "spec/hash_spec.rb:37"]
tests_keys.map { |test_key| @all_tests[test_key] }
end
- def get_hardware_model
- hardware_model = %x|cat /sys/devices/virtual/dmi/id/product_name|
- hardware_model.strip[0..14] # remove \n from string
+ def hardware_specs
+ "#{Common::Hardware.cpu} #{Common::Hardware.memory}GB"
end
def registered_manager?(manager_ip_address)
@managers[manager_ip_address]
end
def register_manager(manager_ip_address)
- @managers[manager_ip_address] = manager_ip_address
+ unless registered_manager?(manager_ip_address)
+ @managers[manager_ip_address] = {
+ manager_ip_address: @managers[manager_ip_address]
+ }
+ end
end
def unregister_manager(manager_ip_address)
@managers.remove!(manager_ip_address)
+ Liri.logger.info("Finish connection with Manager #{manager_ip_address} in TCP port: #{@tcp_port}")
+ end
+
+ def init_work_folders(manager_ip_address)
+ return if @managers[manager_ip_address][:compressed_file_folder_path]
+
+ compressed_file_folder_path = File.join(@agent_folder_path, '/', "#{DateTime.now.strftime("%d_%m_%y_%H_%M_%S")}_work")
+ Dir.mkdir(compressed_file_folder_path) unless Dir.exist?(compressed_file_folder_path)
+ decompressed_file_folder_path = File.join(compressed_file_folder_path, '/', Common::SourceCode::DECOMPRESSED_FOLDER_NAME)
+
+ @managers[manager_ip_address] = {
+ compressed_file_folder_path: compressed_file_folder_path,
+ decompressed_file_folder_path: decompressed_file_folder_path
+ }
end
end
end