lib/fasten/executor.rb in fasten-0.4.0 vs lib/fasten/executor.rb in fasten-0.5.0
- old
+ new
@@ -1,16 +1,16 @@
module Fasten
class Executor < Task
- include Fasten::LogSupport
+ include Fasten::Logger
include Fasten::DAG
include Fasten::UI
include Fasten::LoadSave
include Fasten::Stats
- def initialize(name: nil, workers: 8, worker_class: Fasten::Worker, fasten_dir: '.fasten')
+ def initialize(name: nil, developer: STDIN.tty? && STDOUT.tty?, workers: Parallel.physical_processor_count, worker_class: Fasten::Worker, fasten_dir: '.fasten')
setup_stats(name)
- super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir
+ super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir, developer: developer
initialize_dag
self.worker_list = []
log_path = "#{fasten_dir}/log/executor/#{self.name}.log"
FileUtils.mkdir_p File.dirname(log_path)
@@ -28,11 +28,11 @@
end
self.state = task_list.map(&:state).all?(:DONE) ? :DONE : :FAIL
log_fin self, running_counters
- stats_add_entry(self, state, self)
+ stats_add_entry(state, self)
save_stats
end
def done_counters
"#{task_done_list.count}/#{task_list.count}"
@@ -45,71 +45,110 @@
def perform_loop
loop do
wait_for_running_tasks
raise_error_in_failure
remove_workers_as_needed
- dispatch_pending_tasks
+ if %i[PAUSING PAUSED QUITTING].include?(state)
+ check_state
+ else
+ dispatch_pending_tasks
+ end
- break if no_running_tasks? && no_waiting_tasks?
+ break if no_running_tasks? && no_waiting_tasks? || state == :QUIT
end
remove_all_workers
end
+ def check_state
+ if state == :PAUSING && no_running_tasks?
+ self.state = :PAUSED
+ self.ui.message = nil
+ ui.force_clear
+ elsif state == :QUITTING && no_running_tasks?
+ self.state = :QUIT
+ ui.force_clear
+ end
+ end
+
+ def should_wait_for_running_tasks?
+ tasks_running? && (no_waiting_tasks? || tasks_failed? || %i[PAUSING QUITTING].include?(state)) || task_running_list.count >= workers
+ end
+
def wait_for_running_tasks
- while (no_waiting_tasks? && tasks_running?) || task_running_list.count >= workers || (tasks_running? && tasks_failed?)
- ui_update
+ while should_wait_for_running_tasks?
+ ui.update
reads = worker_list.map(&:parent_read)
- reads, _writes, _errors = IO.select(reads, [], [], 0.2)
+ reads, _writes, _errors = IO.select(reads, [], [], 1)
receive_workers_tasks(reads)
end
- ui_update
+
+ ui.update
end
def receive_workers_tasks(reads)
reads&.each do |read|
- worker = worker_list.find { |item| item.parent_read == read }
- task = worker.receive
+ next unless (worker = worker_list.find { |item| item.parent_read == read })
+ task = worker.receive_response
+
task_running_list.delete task
update_task task
log_fin task, done_counters
+ ui.force_clear
end
end
def raise_error_in_failure
return unless tasks_failed?
- remove_all_workers
+ task_error_list.each do |task|
+ log_info "task: #{task} error:#{task.error}\n#{task.error&.backtrace&.join("\n")}"
+ end
- count = task_error_list.count
+ if developer
+ ui.cleanup
+ puts "Stopping because the following tasks failed:\n"
+ task_error_list.map(&:to_s).each { |x| puts " #{x}" }
- raise "Stopping because the following #{count} #{count == 1 ? 'task' : 'tasks'} failed: #{task_error_list.map(&:to_s).join(', ')}"
+ puts 'Entering development console'
+
+ Kernel.binding.pry # rubocop:disable Lint/Debugger
+ else
+ remove_all_workers
+
+ raise "Stopping because the following tasks failed: #{task_error_list.map(&:to_s).join(', ')}"
+ end
end
def remove_workers_as_needed
while worker_list.count > workers
return unless (worker = worker_list.find { |item| item.running_task.nil? })
worker.kill
worker_list.delete worker
+
+ ui.force_clear
end
end
def find_or_create_worker
worker = worker_list.find { |item| item.running_task.nil? }
unless worker
@worker_id = (@worker_id || 0) + 1
worker = worker_class.new executor: self, name: "#{worker_class} #{format '%02X', @worker_id}"
+ worker.block = block if block
worker.fork
worker_list << worker
log_info "Worker created: #{worker}"
+
+ ui.force_clear
end
worker
end
@@ -117,17 +156,20 @@
while tasks_waiting? && task_running_list.count < workers
worker = find_or_create_worker
task = next_task
log_ini task, "on worker #{worker}"
- worker.dispatch(task)
+ worker.send_request(task)
task_running_list << task
+
+ ui.force_clear
end
end
def remove_all_workers
- while (worker = worker_list.pop)
- worker.kill
- end
+ worker_list.each(&:kill)
+ worker_list.clear
+
+ ui.force_clear
end
end
end