lib/fasten/runner.rb in fasten-0.7.6 vs lib/fasten/runner.rb in fasten-0.8.0
- old
+ new
@@ -16,31 +16,30 @@
include Fasten::Support::State
include Fasten::Support::Stats
include Fasten::Support::UI
include Fasten::Support::Yaml
- attr_accessor :name, :stats, :summary, :workers, :worker_class, :fasten_dir, :use_threads, :ui_mode, :developer, :worker_list, :queue, :tasks
+ attr_accessor :name, :stats, :summary, :jobs, :worker_class, :fasten_dir, :use_threads, :ui_mode, :developer, :workers, :queue, :tasks
def initialize(**options)
- %i[name stats summary workers worker_class fasten_dir use_threads ui_mode developer].each do |key|
+ %i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer].each do |key|
options[key] = Fasten.send "default_#{key}" unless options.key? key
end
- @tasks = TaskManager.new
+ @tasks = TaskManager.new(targets: options[:targets] || [])
+ @workers = []
reconfigure(options)
end
def reconfigure(**options)
- %i[name stats summary workers worker_class fasten_dir use_threads ui_mode developer].each do |key|
+ %i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer].each do |key|
send "#{key}=", options[key] if options.key? key
end
initialize_stats
initialize_logger
-
- self.worker_list ||= []
end
def task(name, **opts, &block)
tasks << task = Task.new(name: name, **opts, block: block)
@@ -115,11 +114,11 @@
ui.force_clear
end
end
def should_wait_for_running_tasks?
- tasks.running? && (tasks.no_waiting? || tasks.failed? || %i[PAUSING QUITTING].include?(state)) || tasks.running.count >= workers
+ tasks.running? && (tasks.no_waiting? || tasks.failed? || %i[PAUSING QUITTING].include?(state)) || tasks.running.count >= jobs
end
def wait_for_running_tasks
use_threads ? wait_for_running_tasks_thread : wait_for_running_tasks_fork
end
@@ -128,17 +127,17 @@
self.queue ||= TimeoutQueue.new
while should_wait_for_running_tasks?
ui.update
- receive_workers_tasks_thread queue.receive_with_timeout(0.5)
+ receive_jobs_tasks_thread queue.receive_with_timeout(0.5)
end
ui.update
end
- def receive_workers_tasks_thread(items)
+ def receive_jobs_tasks_thread(items)
items&.each do |task|
tasks.running.delete task
task.worker.running_task = task.worker.state = nil
@@ -151,22 +150,22 @@
end
def wait_for_running_tasks_fork
while should_wait_for_running_tasks?
ui.update
- reads = worker_list.map(&:parent_read)
+ reads = workers.map(&:parent_read)
reads, _writes, _errors = IO.select(reads, [], [], 0.5)
- receive_workers_tasks_fork(reads)
+ receive_jobs_tasks_fork(reads)
end
ui.update
end
- def receive_workers_tasks_fork(reads)
+ def receive_jobs_tasks_fork(reads)
reads&.each do |read|
- next unless (worker = worker_list.find { |item| item.parent_read == read })
+ next unless (worker = workers.find { |item| item.parent_read == read })
task = worker.receive_response_from_child
tasks.running.delete task
@@ -204,39 +203,39 @@
raise message
end
end
def remove_workers_as_needed
- while worker_list.count > workers
- return unless (worker = worker_list.find { |item| item.running_task.nil? })
+ while workers.count > jobs
+ return unless (worker = workers.find { |item| item.running_task.nil? })
worker.kill
- worker_list.delete worker
+ workers.delete worker
ui.force_clear
end
end
def find_or_create_worker
- worker = worker_list.find { |item| item.running_task.nil? }
+ worker = workers.find { |item| item.running_task.nil? }
unless worker
@worker_id = (@worker_id || 0) + 1
worker = worker_class.new runner: self, name: "#{worker_class.to_s.gsub('::', '-')}-#{format '%02X', @worker_id}", use_threads: use_threads
worker.start
- worker_list << worker
+ workers << worker
log_info "Worker created: #{worker}"
ui.force_clear
end
worker
end
def dispatch_pending_tasks
- while tasks.waiting? && tasks.running.count < workers
+ while tasks.waiting? && tasks.running.count < jobs
worker = find_or_create_worker
task = tasks.next
log_ini task, "on worker #{worker}"
worker.send_request_to_child(task)
@@ -245,11 +244,11 @@
ui.force_clear
end
end
def remove_all_workers
- worker_list.each(&:kill)
- worker_list.clear
+ workers.each(&:kill)
+ workers.clear
ui.force_clear
end
def kind