lib/fasten/executor.rb in fasten-0.1.0 vs lib/fasten/executor.rb in fasten-0.2.0
- old
+ new
@@ -1,88 +1,126 @@
module Fasten
class Executor < Task
include Fasten::LogSupport
+ include Fasten::DAG
+ include Fasten::UI
- def initialize(name: nil, max_child: 8)
- super name: name || $PROGRAM_NAME
- self.max_childs = max_child
+ def initialize(name: nil, workers: 8, worker_class: Fasten::Worker, fasten_dir: '.fasten')
+ super name: name || "#{self.class} #{$PID}", workers: workers, pid: $PID, state: :IDLE, worker_class: worker_class, fasten_dir: fasten_dir
+ initialize_dag
- self.pid = $PID
- self.tasks = {}
- self.dag = Fasten::DAG.new
- self.running = false
- self.child_jobs = {}
- self.running_tasks = []
+ self.worker_list = []
+ log_path = "#{fasten_dir}/log/executor/#{self.name}.log"
+ FileUtils.mkdir_p File.dirname(log_path)
+ self.log_file = File.new(log_path, 'a')
+ Fasten.logger.reopen log_file
end
- def add(task)
- dag.add task
- end
-
def perform
- log_ini self
- self.ini = Time.new
- self.running = true
+ log_ini self, running_stats
+ self.state = :RUNNING
- perform_loop
+ run_ui do
+ perform_loop
+ end
- self.fin = Time.new
- log_fin self
+ self.state = :IDLE
+ log_fin self, running_stats
end
- protected
-
- def log_ini(object)
- log_info "Init #{dag.done.count + running_tasks.count}/#{dag.tasks.count} #{object.class} #{object} "
+ def done_stats
+ "#{task_done_list.count}/#{task_list.count}"
end
- def log_fin(object)
- log_info "Done #{dag.done.count}/#{dag.tasks.count} #{object.class} #{object} in #{object.fin - object.ini}"
+ def running_stats
+ "#{task_done_list.count + task_running_list.count}/#{task_list.count}"
end
def perform_loop
- while running
- next_task = dag.next_task
+ loop do
+ wait_for_running_tasks
+ raise_error_in_failure
+ remove_workers_as_needed
+ dispatch_pending_tasks
- wait_children next_task
- run_next_task next_task
+ break if no_running_tasks? && no_waiting_tasks?
+ end
- self.running = !(next_task.nil? && child_jobs.empty? && dag.waiting.empty?)
+ remove_all_workers
+ end
+
+ def wait_for_running_tasks
+ while (no_waiting_tasks? && tasks_running?) || task_running_list.count >= workers || (tasks_running? && tasks_failed?)
+ ui_update
+ reads = worker_list.map(&:parent_read)
+ reads, _writes, _errors = IO.select(reads, [], [], 0.2)
+
+ receive_workers_tasks(reads)
end
+ ui_update
+ end
- wait_remaining
+ def receive_workers_tasks(reads)
+ reads&.each do |read|
+ worker = worker_list.find { |item| item.parent_read == read }
+ task = worker.receive
+
+ task_running_list.delete task
+
+ update_task task
+
+ log_fin task, done_stats
+ end
end
- def wait_children(next_task)
- return unless (next_task.nil? && !child_jobs.empty?) || child_jobs.count >= max_childs
+ def raise_error_in_failure
+ return unless tasks_failed?
- pid = Process.wait(0)
- done_task = child_jobs.delete pid
- return unless done_task
+ remove_all_workers
- dag.update_task done_task, done: true, fin: Time.new
- running_tasks.delete done_task
+ count = task_error_list.count
- log_fin done_task
+ raise "Stopping because the following #{count} #{count == 1 ? 'task' : 'tasks'} failed: #{task_error_list.map(&:to_s).join(', ')}"
end
- def run_next_task(next_task)
- return unless next_task
+ def remove_workers_as_needed
+ while worker_list.count > workers
+ return unless (worker = worker_list.find { |item| item.running_task.nil? })
- running_tasks << next_task
- log_ini next_task
+ worker.kill
+ worker_list.delete worker
+ end
+ end
- next_task.ini = Time.new
- pid = fork do
- next_task.perform
+ def find_or_create_worker
+ worker = worker_list.find { |item| item.running_task.nil? }
+
+ unless worker
+ @worker_id = (@worker_id || 0) + 1
+ worker = worker_class.new executor: self, name: "#{worker_class} #{format '%02X', @worker_id}"
+ worker.fork
+ worker_list << worker
+
+ log_info "Worker created: #{worker}"
end
- child_jobs[pid] = next_task
+
+ worker
end
- def wait_remaining
- child_jobs.each do |child_pid, child_task|
- Process.wait child_pid
- dag.update_task child_task, done: true
+ def dispatch_pending_tasks
+ while tasks_waiting? && task_running_list.count < workers
+ worker = find_or_create_worker
+
+ task = next_task
+ log_ini task, "on worker #{worker}"
+ worker.dispatch(task)
+ task_running_list << task
+ end
+ end
+
+ def remove_all_workers
+ while (worker = worker_list.pop)
+ worker.kill
end
end
end
end