lib/fasten/runner.rb in fasten-0.7.6 vs lib/fasten/runner.rb in fasten-0.8.0

- old
+ new

@@ -16,31 +16,30 @@ include Fasten::Support::State include Fasten::Support::Stats include Fasten::Support::UI include Fasten::Support::Yaml - attr_accessor :name, :stats, :summary, :workers, :worker_class, :fasten_dir, :use_threads, :ui_mode, :developer, :worker_list, :queue, :tasks + attr_accessor :name, :stats, :summary, :jobs, :worker_class, :fasten_dir, :use_threads, :ui_mode, :developer, :workers, :queue, :tasks def initialize(**options) - %i[name stats summary workers worker_class fasten_dir use_threads ui_mode developer].each do |key| + %i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer].each do |key| options[key] = Fasten.send "default_#{key}" unless options.key? key end - @tasks = TaskManager.new + @tasks = TaskManager.new(targets: options[:targets] || []) + @workers = [] reconfigure(options) end def reconfigure(**options) - %i[name stats summary workers worker_class fasten_dir use_threads ui_mode developer].each do |key| + %i[name stats summary jobs worker_class fasten_dir use_threads ui_mode developer].each do |key| send "#{key}=", options[key] if options.key? key end initialize_stats initialize_logger - - self.worker_list ||= [] end def task(name, **opts, &block) tasks << task = Task.new(name: name, **opts, block: block) @@ -115,11 +114,11 @@ ui.force_clear end end def should_wait_for_running_tasks? - tasks.running? && (tasks.no_waiting? || tasks.failed? || %i[PAUSING QUITTING].include?(state)) || tasks.running.count >= workers + tasks.running? && (tasks.no_waiting? || tasks.failed? || %i[PAUSING QUITTING].include?(state)) || tasks.running.count >= jobs end def wait_for_running_tasks use_threads ? wait_for_running_tasks_thread : wait_for_running_tasks_fork end @@ -128,17 +127,17 @@ self.queue ||= TimeoutQueue.new while should_wait_for_running_tasks? ui.update - receive_workers_tasks_thread queue.receive_with_timeout(0.5) + receive_jobs_tasks_thread queue.receive_with_timeout(0.5) end ui.update end - def receive_workers_tasks_thread(items) + def receive_jobs_tasks_thread(items) items&.each do |task| tasks.running.delete task task.worker.running_task = task.worker.state = nil @@ -151,22 +150,22 @@ end def wait_for_running_tasks_fork while should_wait_for_running_tasks? ui.update - reads = worker_list.map(&:parent_read) + reads = workers.map(&:parent_read) reads, _writes, _errors = IO.select(reads, [], [], 0.5) - receive_workers_tasks_fork(reads) + receive_jobs_tasks_fork(reads) end ui.update end - def receive_workers_tasks_fork(reads) + def receive_jobs_tasks_fork(reads) reads&.each do |read| - next unless (worker = worker_list.find { |item| item.parent_read == read }) + next unless (worker = workers.find { |item| item.parent_read == read }) task = worker.receive_response_from_child tasks.running.delete task @@ -204,39 +203,39 @@ raise message end end def remove_workers_as_needed - while worker_list.count > workers - return unless (worker = worker_list.find { |item| item.running_task.nil? }) + while workers.count > jobs + return unless (worker = workers.find { |item| item.running_task.nil? }) worker.kill - worker_list.delete worker + workers.delete worker ui.force_clear end end def find_or_create_worker - worker = worker_list.find { |item| item.running_task.nil? } + worker = workers.find { |item| item.running_task.nil? } unless worker @worker_id = (@worker_id || 0) + 1 worker = worker_class.new runner: self, name: "#{worker_class.to_s.gsub('::', '-')}-#{format '%02X', @worker_id}", use_threads: use_threads worker.start - worker_list << worker + workers << worker log_info "Worker created: #{worker}" ui.force_clear end worker end def dispatch_pending_tasks - while tasks.waiting? && tasks.running.count < workers + while tasks.waiting? && tasks.running.count < jobs worker = find_or_create_worker task = tasks.next log_ini task, "on worker #{worker}" worker.send_request_to_child(task) @@ -245,11 +244,11 @@ ui.force_clear end end def remove_all_workers - worker_list.each(&:kill) - worker_list.clear + workers.each(&:kill) + workers.clear ui.force_clear end def kind