module Fasten class Executor < Task include Fasten::Logger include Fasten::DAG include Fasten::UI include Fasten::LoadSave include Fasten::Stats def initialize(name: nil, developer: STDIN.tty? && STDOUT.tty?, workers: Parallel.physical_processor_count, worker_class: Fasten::Worker, fasten_dir: '.fasten') setup_stats(name) super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir, developer: developer initialize_dag self.worker_list = [] log_path = "#{fasten_dir}/log/executor/#{self.name}.log" FileUtils.mkdir_p File.dirname(log_path) self.log_file = File.new(log_path, 'a') Fasten.logger.reopen log_file end def perform log_ini self, running_counters self.state = :RUNNING load_stats run_ui do perform_loop end self.state = task_list.map(&:state).all?(:DONE) ? :DONE : :FAIL log_fin self, running_counters stats_add_entry(state, self) save_stats end def done_counters "#{task_done_list.count}/#{task_list.count}" end def running_counters "#{task_done_list.count + task_running_list.count}/#{task_list.count}" end def perform_loop loop do wait_for_running_tasks raise_error_in_failure remove_workers_as_needed if %i[PAUSING PAUSED QUITTING].include?(state) check_state else dispatch_pending_tasks end break if no_running_tasks? && no_waiting_tasks? || state == :QUIT end remove_all_workers end def check_state if state == :PAUSING && no_running_tasks? self.state = :PAUSED self.ui.message = nil ui.force_clear elsif state == :QUITTING && no_running_tasks? self.state = :QUIT ui.force_clear end end def should_wait_for_running_tasks? tasks_running? && (no_waiting_tasks? || tasks_failed? || %i[PAUSING QUITTING].include?(state)) || task_running_list.count >= workers end def wait_for_running_tasks while should_wait_for_running_tasks? ui.update reads = worker_list.map(&:parent_read) reads, _writes, _errors = IO.select(reads, [], [], 1) receive_workers_tasks(reads) end ui.update end def receive_workers_tasks(reads) reads&.each do |read| next unless (worker = worker_list.find { |item| item.parent_read == read }) task = worker.receive_response task_running_list.delete task update_task task log_fin task, done_counters ui.force_clear end end def raise_error_in_failure return unless tasks_failed? task_error_list.each do |task| log_info "task: #{task} error:#{task.error}\n#{task.error&.backtrace&.join("\n")}" end if developer ui.cleanup puts "Stopping because the following tasks failed:\n" task_error_list.map(&:to_s).each { |x| puts " #{x}" } puts 'Entering development console' Kernel.binding.pry # rubocop:disable Lint/Debugger else remove_all_workers raise "Stopping because the following tasks failed: #{task_error_list.map(&:to_s).join(', ')}" end end def remove_workers_as_needed while worker_list.count > workers return unless (worker = worker_list.find { |item| item.running_task.nil? }) worker.kill worker_list.delete worker ui.force_clear end end def find_or_create_worker worker = worker_list.find { |item| item.running_task.nil? } unless worker @worker_id = (@worker_id || 0) + 1 worker = worker_class.new executor: self, name: "#{worker_class} #{format '%02X', @worker_id}" worker.block = block if block worker.fork worker_list << worker log_info "Worker created: #{worker}" ui.force_clear end worker end def dispatch_pending_tasks while tasks_waiting? && task_running_list.count < workers worker = find_or_create_worker task = next_task log_ini task, "on worker #{worker}" worker.send_request(task) task_running_list << task ui.force_clear end end def remove_all_workers worker_list.each(&:kill) worker_list.clear ui.force_clear end end end