lib/fasten/executor.rb in fasten-0.1.0 vs lib/fasten/executor.rb in fasten-0.2.0

- old
+ new

@@ -1,88 +1,126 @@ module Fasten class Executor < Task include Fasten::LogSupport + include Fasten::DAG + include Fasten::UI - def initialize(name: nil, max_child: 8) - super name: name || $PROGRAM_NAME - self.max_childs = max_child + def initialize(name: nil, workers: 8, worker_class: Fasten::Worker, fasten_dir: '.fasten') + super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir + initialize_dag - self.pid = $PID - self.tasks = {} - self.dag = Fasten::DAG.new - self.running = false - self.child_jobs = {} - self.running_tasks = [] + self.worker_list = [] + log_path = "#{fasten_dir}/log/executor/#{self.name}.log" + FileUtils.mkdir_p File.dirname(log_path) + self.log_file = File.new(log_path, 'a') + Fasten.logger.reopen log_file end - def add(task) - dag.add task - end - def perform - log_ini self - self.ini = Time.new - self.running = true + log_ini self, running_stats + self.state = :RUNNING - perform_loop + run_ui do + perform_loop + end - self.fin = Time.new - log_fin self + self.state = :IDLE + log_fin self, running_stats end - protected - - def log_ini(object) - log_info "Init #{dag.done.count + running_tasks.count}/#{dag.tasks.count} #{object.class} #{object} " + def done_stats + "#{task_done_list.count}/#{task_list.count}" end - def log_fin(object) - log_info "Done #{dag.done.count}/#{dag.tasks.count} #{object.class} #{object} in #{object.fin - object.ini}" + def running_stats + "#{task_done_list.count + task_running_list.count}/#{task_list.count}" end def perform_loop - while running - next_task = dag.next_task + loop do + wait_for_running_tasks + raise_error_in_failure + remove_workers_as_needed + dispatch_pending_tasks - wait_children next_task - run_next_task next_task + break if no_running_tasks? && no_waiting_tasks? + end - self.running = !(next_task.nil? && child_jobs.empty? && dag.waiting.empty?) + remove_all_workers + end + + def wait_for_running_tasks + while (no_waiting_tasks? && tasks_running?) || task_running_list.count >= workers || (tasks_running? && tasks_failed?) + ui_update + reads = worker_list.map(&:parent_read) + reads, _writes, _errors = IO.select(reads, [], [], 0.2) + + receive_workers_tasks(reads) end + ui_update + end - wait_remaining + def receive_workers_tasks(reads) + reads&.each do |read| + worker = worker_list.find { |item| item.parent_read == read } + task = worker.receive + + task_running_list.delete task + + update_task task + + log_fin task, done_stats + end end - def wait_children(next_task) - return unless (next_task.nil? && !child_jobs.empty?) || child_jobs.count >= max_childs + def raise_error_in_failure + return unless tasks_failed? - pid = Process.wait(0) - done_task = child_jobs.delete pid - return unless done_task + remove_all_workers - dag.update_task done_task, done: true, fin: Time.new - running_tasks.delete done_task + count = task_error_list.count - log_fin done_task + raise "Stopping because the following #{count} #{count == 1 ? 'task' : 'tasks'} failed: #{task_error_list.map(&:to_s).join(', ')}" end - def run_next_task(next_task) - return unless next_task + def remove_workers_as_needed + while worker_list.count > workers + return unless (worker = worker_list.find { |item| item.running_task.nil? }) - running_tasks << next_task - log_ini next_task + worker.kill + worker_list.delete worker + end + end - next_task.ini = Time.new - pid = fork do - next_task.perform + def find_or_create_worker + worker = worker_list.find { |item| item.running_task.nil? } + + unless worker + @worker_id = (@worker_id || 0) + 1 + worker = worker_class.new executor: self, name: "#{worker_class} #{format '%02X', @worker_id}" + worker.fork + worker_list << worker + + log_info "Worker created: #{worker}" end - child_jobs[pid] = next_task + + worker end - def wait_remaining - child_jobs.each do |child_pid, child_task| - Process.wait child_pid - dag.update_task child_task, done: true + def dispatch_pending_tasks + while tasks_waiting? && task_running_list.count < workers + worker = find_or_create_worker + + task = next_task + log_ini task, "on worker #{worker}" + worker.dispatch(task) + task_running_list << task + end + end + + def remove_all_workers + while (worker = worker_list.pop) + worker.kill end end end end