lib/manager/manager.rb in liri-0.3.0 vs lib/manager/manager.rb in liri-0.3.1
- old
+ new
@@ -13,33 +13,36 @@
# Inicia la ejecución del Manager
# @param stop [Boolean] el valor true es para que no se ejecute infinitamente el método en el test unitario.
def run(source_code_folder_path, stop = false)
return unless valid_project
- setup_manager = Liri.set_setup(source_code_folder_path)
+ setup_manager = Liri.set_setup(source_code_folder_path, :manager, manager_tests_results_folder_time: DateTime.now.strftime("%d_%m_%y_%H_%M_%S"))
manager_folder_path = setup_manager.manager_folder_path
+ manager_tests_results_folder_path = setup_manager.manager_tests_results_folder_path
Liri.set_logger(setup_manager.logs_folder_path, 'liri-manager.log')
- Liri.logger.info("Proceso Manager iniciado")
- Liri.logger.info("Presione Ctrl + c para terminar el proceso Manager manualmente\n", true)
+ Liri.logger.info('Manager process started')
+ Liri.logger.info("Press Ctrl + c to finish Manager process manually\n", true)
user, password = get_credentials(setup_manager.setup_folder_path)
source_code = compress_source_code(source_code_folder_path, manager_folder_path)
- manager_data = get_manager_data(user, password, manager_folder_path, source_code)
+ manager_data = get_manager_data(user, password, manager_tests_results_folder_path, source_code)
all_tests = get_all_tests(source_code)
- tests_result = Common::TestsResult.new(manager_folder_path)
+ tests_result = Common::TestsResult.new(manager_tests_results_folder_path)
- manager = Manager.new(Liri.udp_port, Liri.tcp_port, all_tests, tests_result, manager_folder_path)
+ manager = Manager.new(Liri.udp_port, Liri.tcp_port, all_tests, tests_result)
threads = []
threads << manager.start_client_socket_to_search_agents(manager_data) # Enviar peticiones broadcast a toda la red para encontrar Agents
manager.start_server_socket_to_process_tests(threads[0]) unless stop # Esperar y enviar los test unitarios a los Agents
+ source_code.delete_compressed_file
+
Liri.init_exit(stop, threads, 'Manager')
- Liri.logger.info("Proceso Manager terminado")
+ Liri.logger.info('Manager process finished')
rescue SignalException => e
- Liri.logger.info("Exception(#{e}) Proceso Manager terminado manualmente")
+ Liri.logger.info("Exception(#{e}) Proceso Manager process finished manually")
Liri.kill(threads)
end
def test_files_by_runner
Liri.setup.test_files_by_runner
@@ -48,13 +51,13 @@
private
def valid_project
if File.exist?(File.join(Dir.pwd, 'Gemfile'))
true
else
- Liri.logger.info("No se encuentra un archivo Gemfile por lo que se asume que el directorio actual no corresponde a un proyecto Ruby")
- Liri.logger.info("Liri sólo puede ejecutarse en proyectos Ruby")
- Liri.logger.info("Proceso Manager terminado")
+ Liri.logger.info('Not found Gemfile. Assuming run Manager in not Ruby project')
+ Liri.logger.info('Liri can be run only in Ruby projects')
+ Liri.logger.info('Manager process finished')
false
end
end
def get_credentials(setup_folder_path)
@@ -63,74 +66,70 @@
end
def compress_source_code(source_code_folder_path, manager_folder_path)
source_code = Common::SourceCode.new(source_code_folder_path, manager_folder_path, Liri.compression_class, Liri.unit_test_class)
- Common::Progressbar.start(total: nil, length: 100, format: 'Comprimiendo Código Fuente |%B| %a') do
+ Common::Progressbar.start(total: nil, length: 100, format: 'Compressing source code |%B| %a') do
source_code.compress_folder
end
puts "\n\n"
source_code
end
- def get_manager_data(user, password, manager_folder_path, source_code)
+ def get_manager_data(user, password, tests_results_folder_path, source_code)
Common::ManagerData.new(
- folder_path: manager_folder_path,
+ tests_results_folder_path: tests_results_folder_path,
compressed_file_path: source_code.compressed_file_path,
user: user,
password: password
)
end
def get_all_tests(source_code)
all_tests = {}
- Common::Progressbar.start(total: nil, length: 100, format: 'Extrayendo Pruebas Unitarias |%B| %a') do
+ Common::Progressbar.start(total: nil, length: 100, format: 'Getting unit tests |%B| %a') do
all_tests = source_code.all_tests
end
puts "\n\n"
all_tests
end
end
- def initialize(udp_port, tcp_port, all_tests, tests_result, manager_folder_path)
+ def initialize(udp_port, tcp_port, all_tests, tests_result)
@udp_port = udp_port
@udp_socket = UDPSocket.new
@tcp_port = tcp_port
@all_tests = all_tests
@all_tests_count = all_tests.size
@all_tests_results = {}
- @all_tests_results_count = 0
+ @files_processed = 0
@all_tests_processing_count = 0
@agents = {}
@agents_search_processing_enabled = true
@test_processing_enabled = true
@tests_batch_number = 0
- @tests_batches = {}
+ @processed_tests_batches = {}
@tests_result = tests_result
@semaphore = Mutex.new
- @manager_folder_path = manager_folder_path
-
@progressbar = ProgressBar.create(starting_at: 0, total: @all_tests_count, length: 100, format: 'Progress %c/%C |%b=%i| %p%% | %a')
end
# Inicia un cliente udp que hace un broadcast en toda la red para iniciar una conexión con los Agent que estén escuchando
def start_client_socket_to_search_agents(manager_data)
# El cliente udp se ejecuta en bucle dentro de un hilo, esto permite realizar otras tareas mientras este hilo sigue sondeando
# la red para obtener mas Agents. Una vez que los tests terminan de ejecutarse, este hilo será finalizado.
Thread.new do
- Liri.logger.info("Buscando Agentes... Espere")
- Liri.logger.info("Se emite un broadcast cada #{Liri.udp_request_delay} segundos en el puerto UDP: #{@udp_port}
- (Se mantiene escaneando la red para encontrar Agents)
- ")
+ Liri.logger.info('Searching agents... Wait')
+ Liri.logger.info("Sending UDP broadcast each #{Liri.udp_request_delay} seconds in UDP port: #{@udp_port}")
while agents_search_processing_enabled
@udp_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
@udp_socket.send(manager_data.to_h.to_json, 0, '<broadcast>', @udp_port)
sleep(Liri.udp_request_delay) # Se pausa un momento antes de efectuar nuevamente la petición broadcast
end
@@ -140,92 +139,98 @@
# Inicia un servidor tcp para procesar los pruebas después de haberse iniciado la conexión a través de udp
def start_server_socket_to_process_tests(search_agents_thread)
begin
tcp_socket = TCPServer.new(@tcp_port) # se hace un bind al puerto dado
rescue Errno::EADDRINUSE => e
- Liri.logger.error("Exception(#{e}) Puerto TCP #{@tcp_port} ocupado.")
+ Liri.logger.error("Exception(#{e}) Busy UDP port #{@tcp_port}.")
Thread.kill(search_agents_thread)
- Thread.exit
+ return
end
- Liri.logger.info("En espera para establecer conexión con los Agents en el puerto TCP: #{@tcp_port}
- (Se espera que algún Agent se conecte para ejecutar las pruebas como respuesta al broadcast UDP)
- ")
+ Liri.logger.info("Waiting Agents connection in TCP port: #{@tcp_port}")
# El siguiente bucle permite que varios clientes es decir Agents se conecten
# De: http://www.w3big.com/es/ruby/ruby-socket-programming.html
while test_processing_enabled
Thread.start(tcp_socket.accept) do |client|
agent_ip_address = client.remote_address.ip_address
- response = client.recvfrom(1000).first
+ hardware_model = nil
+ run_tests_batch_time_start = nil
- Liri.logger.info("\nConexión iniciada con el Agente: #{agent_ip_address}")
- Liri.logger.info("Respuesta al broadcast recibida del Agent: #{agent_ip_address} en el puerto TCP: #{@tcp_port}: #{response}")
+ while line = client.gets
+ client_data = JSON.parse(line.chop)
+ msg = client_data['msg']
- # Se le indica al agente que proceda
- client.puts({ msg: 'Recibido', exist_tests: all_tests.any? }.to_json)
+ if msg == 'get_source_code'
+ if registered_agent?(agent_ip_address)
+ client.puts({ msg: 'already_connected' }.to_json)
+ client.close
+ break
+ else
+ register_agent(agent_ip_address)
+ hardware_model = client_data['hardware_model']
+ msg = all_tests.any? ? 'proceed_get_source_code' : 'no_exist_tests'
+ client.puts({ msg: msg }.to_json)
+ end
+ end
- if all_tests.empty?
- # No importa lo que le haga, el broadcast udp no se muere al instante y el agente sigue respondiendo
- # Las siguientes dos lineas son para que se deje de hacer el broadcast pero aun asi se llegan a hacer
- # 3 a 4 broadcast antes de que se finalize el proceso, al parecer el broadcast va a tener que quedar asi
- # y mejorar el codigo para que se envien test pendientes para eso hay que llevar una lista de test pendientes
- # tests enviados sin resultados, tests finalizados, si se recibe respuesta al broadcast se trata de enviar primero test pendientes
- # luego test enviados sin resultados o sino ignorar
- Thread.kill(search_agents_thread)
- agents_search_processing_enabled = false
- Liri.logger.info("Se termina cualquier proceso pendiente con el Agent #{agent_ip_address} en el puerto TCP: #{@tcp_port}: #{response}")
- client.close
- Thread.exit
- end
+ if msg == 'get_source_code_fail'
+ client.puts({ msg: 'finish_agent' }.to_json)
+ client.close
+ break
+ end
- while all_tests.any?
- time_in_seconds = Liri::Common::Benchmarking.start(start_msg: "Proceso de Ejecución de pruebas. Agent: #{agent_ip_address}. Espere... ", end_msg: "Proceso de Ejecución de pruebas. Agent: #{agent_ip_address}. Duración: ", stdout: false) do
+ if msg == 'get_tests_files'
+ Liri.logger.info("Running unit tests. Agent: #{agent_ip_address}. Wait... ", false)
+ run_tests_batch_time_start = Time.now
+
tests_batch = tests_batch(agent_ip_address)
- break unless tests_batch
+ if tests_batch.empty?
+ client.puts({ msg: 'no_exist_tests' }.to_json)
+ client.close
+ break
+ else
+ client.puts(tests_batch.to_json) # Se envia el lote de tests
+ end
+ end
- begin
- Liri.logger.debug("Conjunto de pruebas enviadas al Agent #{agent_ip_address}: #{tests_batch}")
+ if msg == 'processed_tests'
+ tests_result = client_data
+ Liri.logger.debug("Agent response #{agent_ip_address}: #{tests_result}")
+ batch_run = Time.now - run_tests_batch_time_start
+ process_tests_result(agent_ip_address, hardware_model, tests_result, batch_run)
- client.puts(tests_batch.to_json) # Se envia el lote de tests
- response = client.recvfrom(1000).first # Se recibe la respuesta. Cuando mas alto es el parámetro de recvfrom, mas datos se reciben osino se truncan.
- rescue Errno::EPIPE => e
- # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar
- Liri.logger.error("Exception(#{e}) El Agent #{agent_ip_address} ya terminó la conexión")
- # Si el Agente ya no responde es mejor romper el bucle para que no quede colgado
+ run_tests_batch_time_start = Time.now
+
+ tests_batch = tests_batch(agent_ip_address)
+ if tests_batch.empty?
+ client.puts({ msg: 'no_exist_tests' }.to_json)
+ client.close
break
+ else
+ client.puts(tests_batch.to_json) # Se envia el lote de tests
end
end
-
- # Se captura por si acaso los errores de parseo JSON
- begin
- tests_result = JSON.parse(response)
- Liri.logger.debug("Respuesta del Agent #{agent_ip_address}: #{tests_result}")
- process_tests_result(agent_ip_address, tests_result, time_in_seconds)
- rescue JSON::ParserError => e
- Liri.logger.error("Exception(#{e}) Error de parseo JSON")
- end
end
update_processing_statuses
- Liri.logger.info("Se termina la conexión con el Agent #{agent_ip_address} en el puerto TCP: #{@tcp_port}")
- begin
- client.puts('exit') # Se envía el string exit para que el Agent sepa que el proceso terminó
- client.close # se desconecta el cliente
- rescue Errno::EPIPE => e
- # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar
- Liri.logger.error("Exception(#{e}) El Agent #{agent_ip_address} ya terminó la conexión")
- # Si el Agente ya no responde es mejor terminar el hilo. Aunque igual quedará colgado el Manager
- # mientras sigan pruebas pendientes
- Thread.exit
- end
+ Thread.kill(search_agents_thread)
+ unregister_agent(agent_ip_address)
+ rescue Errno::EPIPE => e
+ # Esto al parecer se da cuando el Agent ya cerró las conexiones y el Manager intenta contactar
+ Liri.logger.error("Exception(#{e}) Agent #{agent_ip_address} already finished connection")
+ # Si el Agente ya no responde es mejor terminar el hilo. Aunque igual quedará colgado el Manager
+ # mientras sigan pruebas pendientes
+ unregister_agent(agent_ip_address)
+ Thread.exit
end
end
- Liri.clean_folder_content(@manager_folder_path)
@tests_result.print_summary
print_agents_summary
- @tests_result.print_failures if Liri.print_failures
+ print_agents_detailed_summary if Liri.print_agents_detailed_summary
+ @tests_result.print_failures_list if Liri.print_failures_list
+ @tests_result.print_failed_examples if Liri.print_failed_examples
end
def all_tests
@semaphore.synchronize do
@all_tests
@@ -250,71 +255,130 @@
end
end
def update_processing_statuses
@semaphore.synchronize do
- @test_processing_enabled = false if @all_tests_count == @all_tests_results_count
+ @test_processing_enabled = false if @all_tests_count == @files_processed
@agents_search_processing_enabled = false if @all_tests_count == @all_tests_processing_count
end
end
def tests_batch(agent_ip_address)
# Se inicia un semáforo para evitar que varios hilos actualicen variables compartidas
@semaphore.synchronize do
- return nil if @all_tests.empty?
+ return {} if @all_tests.empty?
@tests_batch_number += 1 # Se numera cada lote
samples = @all_tests.sample!(Manager.test_files_by_runner) # Se obtiene algunos tests
samples_keys = samples.keys # Se obtiene la clave asignada a los tests
@all_tests_processing_count += samples_keys.size
- @agents[agent_ip_address] = { agent_ip_address: agent_ip_address, tests_processed_count: 0, examples: 0, failures: 0, time_in_seconds: 0, duration: '' } unless @agents[agent_ip_address]
-
- #@tests_batches[@tests_batch_number] = { agent_ip_address: agent_ip_address, tests_batch_keys: samples_keys } # Se guarda el lote a enviar
- tests_batch = { tests_batch_number: @tests_batch_number, tests_batch_keys: samples_keys } # Se construye el lote a enviar
+ tests_batch = { msg: 'process_tests', tests_batch_number: @tests_batch_number, tests_batch_keys: samples_keys } # Se construye el lote a enviar
+ Liri.logger.debug("Tests batches sent to Agent #{agent_ip_address}: #{tests_batch}")
tests_batch
end
end
- def process_tests_result(agent_ip_address, tests_result, time_in_seconds)
+ def process_tests_result(agent_ip_address, hardware_model, tests_result, batch_run)
# Se inicia un semáforo para evitar que varios hilos actualicen variables compartidas
@semaphore.synchronize do
tests_batch_number = tests_result['tests_batch_number']
tests_result_file_name = tests_result['tests_result_file_name']
- tests_batch_keys_size = tests_result['tests_batch_keys_size']
+ files_processed = tests_result['tests_batch_keys_size']
- #tests_batch_keys = @tests_batches[tests_batch_number][:tests_batch_keys]
- tests_processed_count = tests_batch_keys_size
- @all_tests_results_count += tests_processed_count
+ @files_processed += files_processed
- @progressbar.progress = @all_tests_results_count
+ @progressbar.progress = @files_processed
- #@tests_batches[tests_batch_number][:tests_result_file_name] = tests_result_file_name
+ tests_result = @tests_result.process(tests_result_file_name, files_processed)
- tests_result = @tests_result.process(tests_result_file_name)
+ @processed_tests_batches[tests_batch_number] = tests_result.clone
+ @processed_tests_batches[tests_batch_number][:batch_run] = batch_run
+ @processed_tests_batches[tests_batch_number][:agent_ip_address] = agent_ip_address
+ @processed_tests_batches[tests_batch_number][:hardware_model] = hardware_model
+ @processed_tests_batches[tests_batch_number][:tests_batch_number] = tests_batch_number
- @agents[agent_ip_address][:tests_processed_count] += tests_processed_count
- @agents[agent_ip_address][:examples] += tests_result[:example_quantity]
- @agents[agent_ip_address][:failures] += tests_result[:failure_quantity]
- @agents[agent_ip_address][:time_in_seconds] += time_in_seconds
- @agents[agent_ip_address][:duration] = @agents[agent_ip_address][:time_in_seconds].to_duration
-
- Liri.logger.info("Pruebas procesadas por Agente: #{agent_ip_address}: #{@agents[agent_ip_address][:tests_processed_count]}")
+ Liri.logger.info("Processed unit tests by Agent: #{agent_ip_address}: #{files_processed}")
end
end
def print_agents_summary
- rows = @agents.values.map { |line| line.values }
- headings = @agents.values.first.keys
+ processed_tests_batches_by_agent = processed_tests_batches_by_agents
+ rows = processed_tests_batches_by_agent.values.map do |value|
+ value[:finish_in] = value[:finish_in].to_duration
+ value[:files_load] = value[:files_load].to_duration
+ value[:batch_run] = value[:batch_run].to_duration
+ value.values
+ end
- table = Terminal::Table.new title: "Resúmen", headings: headings, rows: rows
- table.style = {padding_left: 3, border_x: "=", border_i: "x" }
- table.align_column(1, :right)
- table.align_column(2, :right)
- table.align_column(3, :right)
- table.align_column(4, :right)
- table.align_column(5, :right)
+ rows << Array.new(9) # Se agrega una linea vacia antes de mostrar los totales
+ rows << get_footer_values
+ header = processed_tests_batches_by_agent.values.first.keys
+ table = Terminal::Table.new title: 'Summary', headings: header, rows: rows
+ table.style = { padding_left: 3, border_x: '=', border_i: 'x'}
puts table
+ end
+
+ def processed_tests_batches_by_agents
+ tests_batches = {}
+ @processed_tests_batches.values.each do |processed_test_batch|
+ agent_ip_address = processed_test_batch[:agent_ip_address]
+ if tests_batches[agent_ip_address]
+ tests_batches[agent_ip_address][:examples] += processed_test_batch[:examples]
+ tests_batches[agent_ip_address][:failures] += processed_test_batch[:failures]
+ tests_batches[agent_ip_address][:pending] += processed_test_batch[:pending]
+ tests_batches[agent_ip_address][:passed] += processed_test_batch[:passed]
+ tests_batches[agent_ip_address][:finish_in] += processed_test_batch[:finish_in]
+ tests_batches[agent_ip_address][:files_load] += processed_test_batch[:files_load]
+ tests_batches[agent_ip_address][:files_processed] += processed_test_batch[:files_processed]
+ tests_batches[agent_ip_address][:batch_run] += processed_test_batch[:batch_run]
+ else
+ _processed_test_batch = processed_test_batch.clone # Clone to change values in other hash
+ _processed_test_batch.remove!(:failures_list, :failed_examples, :agent_ip_address, :tests_batch_number)
+ tests_batches[agent_ip_address] = _processed_test_batch
+ end
+ end
+ tests_batches
+ end
+
+ def print_agents_detailed_summary
+ puts "\n"
+ rows = @processed_tests_batches.values.map do |value|
+ value.remove!(:failures_list, :failed_examples, :agent_ip_address, :tests_batch_number)
+ value[:finish_in] = value[:finish_in].to_duration
+ value[:files_load] = value[:files_load].to_duration
+ value[:batch_run] = value[:batch_run].to_duration
+ value.values
+ end
+
+ rows << Array.new(9) # Se agrega una linea vacia antes de mostrar los totales
+ rows << get_footer_values
+ header = @processed_tests_batches.values.first.keys
+
+ table = Terminal::Table.new title: 'Detailed Summary', headings: header, rows: rows
+ table.style = { padding_left: 3, border_x: '=', border_i: 'x' }
+
+ puts table
+ end
+
+ def get_footer_values
+ footer = { examples: @tests_result.examples, failures: @tests_result.failures, pending: @tests_result.pending,
+ passed: @tests_result.passed, finish_in: "", files_load: "",
+ files_processed: @tests_result.files_processed, batch_run: "", hardware_model: "" }
+ footer.values
+ end
+
+ def registered_agent?(agent_ip_address)
+ @agents[agent_ip_address]
+ end
+
+ def register_agent(agent_ip_address)
+ @agents[agent_ip_address] = agent_ip_address
+ Liri.logger.info("\nStarted connection with Agent: #{agent_ip_address} in TCP port: #{@tcp_port}")
+ end
+
+ def unregister_agent(agent_ip_address)
+ @agents.remove!(agent_ip_address)
end
end
end
\ No newline at end of file