lib/fasten/executor.rb in fasten-0.4.0 vs lib/fasten/executor.rb in fasten-0.5.0

- old
+ new

@@ -1,16 +1,16 @@ module Fasten class Executor < Task - include Fasten::LogSupport + include Fasten::Logger include Fasten::DAG include Fasten::UI include Fasten::LoadSave include Fasten::Stats - def initialize(name: nil, workers: 8, worker_class: Fasten::Worker, fasten_dir: '.fasten') + def initialize(name: nil, developer: STDIN.tty? && STDOUT.tty?, workers: Parallel.physical_processor_count, worker_class: Fasten::Worker, fasten_dir: '.fasten') setup_stats(name) - super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir + super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir, developer: developer initialize_dag self.worker_list = [] log_path = "#{fasten_dir}/log/executor/#{self.name}.log" FileUtils.mkdir_p File.dirname(log_path) @@ -28,11 +28,11 @@ end self.state = task_list.map(&:state).all?(:DONE) ? :DONE : :FAIL log_fin self, running_counters - stats_add_entry(self, state, self) + stats_add_entry(state, self) save_stats end def done_counters "#{task_done_list.count}/#{task_list.count}" @@ -45,71 +45,110 @@ def perform_loop loop do wait_for_running_tasks raise_error_in_failure remove_workers_as_needed - dispatch_pending_tasks + if %i[PAUSING PAUSED QUITTING].include?(state) + check_state + else + dispatch_pending_tasks + end - break if no_running_tasks? && no_waiting_tasks? + break if no_running_tasks? && no_waiting_tasks? || state == :QUIT end remove_all_workers end + def check_state + if state == :PAUSING && no_running_tasks? + self.state = :PAUSED + self.ui.message = nil + ui.force_clear + elsif state == :QUITTING && no_running_tasks? + self.state = :QUIT + ui.force_clear + end + end + + def should_wait_for_running_tasks? + tasks_running? && (no_waiting_tasks? || tasks_failed? || %i[PAUSING QUITTING].include?(state)) || task_running_list.count >= workers + end + def wait_for_running_tasks - while (no_waiting_tasks? && tasks_running?) || task_running_list.count >= workers || (tasks_running? && tasks_failed?) - ui_update + while should_wait_for_running_tasks? + ui.update reads = worker_list.map(&:parent_read) - reads, _writes, _errors = IO.select(reads, [], [], 0.2) + reads, _writes, _errors = IO.select(reads, [], [], 1) receive_workers_tasks(reads) end - ui_update + + ui.update end def receive_workers_tasks(reads) reads&.each do |read| - worker = worker_list.find { |item| item.parent_read == read } - task = worker.receive + next unless (worker = worker_list.find { |item| item.parent_read == read }) + task = worker.receive_response + task_running_list.delete task update_task task log_fin task, done_counters + ui.force_clear end end def raise_error_in_failure return unless tasks_failed? - remove_all_workers + task_error_list.each do |task| + log_info "task: #{task} error:#{task.error}\n#{task.error&.backtrace&.join("\n")}" + end - count = task_error_list.count + if developer + ui.cleanup + puts "Stopping because the following tasks failed:\n" + task_error_list.map(&:to_s).each { |x| puts " #{x}" } - raise "Stopping because the following #{count} #{count == 1 ? 'task' : 'tasks'} failed: #{task_error_list.map(&:to_s).join(', ')}" + puts 'Entering development console' + + Kernel.binding.pry # rubocop:disable Lint/Debugger + else + remove_all_workers + + raise "Stopping because the following tasks failed: #{task_error_list.map(&:to_s).join(', ')}" + end end def remove_workers_as_needed while worker_list.count > workers return unless (worker = worker_list.find { |item| item.running_task.nil? }) worker.kill worker_list.delete worker + + ui.force_clear end end def find_or_create_worker worker = worker_list.find { |item| item.running_task.nil? } unless worker @worker_id = (@worker_id || 0) + 1 worker = worker_class.new executor: self, name: "#{worker_class} #{format '%02X', @worker_id}" + worker.block = block if block worker.fork worker_list << worker log_info "Worker created: #{worker}" + + ui.force_clear end worker end @@ -117,17 +156,20 @@ while tasks_waiting? && task_running_list.count < workers worker = find_or_create_worker task = next_task log_ini task, "on worker #{worker}" - worker.dispatch(task) + worker.send_request(task) task_running_list << task + + ui.force_clear end end def remove_all_workers - while (worker = worker_list.pop) - worker.kill - end + worker_list.each(&:kill) + worker_list.clear + + ui.force_clear end end end