# frozen_string_literal: true module Mutant # Parallel execution engine of arbitrary payloads module Parallel # Run async computation returning driver # # @param [World] world # @param [Config] config # # @return [Driver] def self.async(config:, world:) shared = shared_state(world, config) world.process_warmup workers = workers(world, config, shared) Driver.new( workers:, threads: threads(world, config, workers), **shared ) end # rubocop:disable Metrics/MethodLength def self.workers(world, config, shared) Array.new(config.jobs) do |index| Worker.start( block: config.block, index:, on_process_start: config.on_process_start, process_name: "#{config.process_name}-#{index}", timeout: config.timeout, world:, **shared ) end end private_class_method :workers # rubocop:enable Metrics/MethodLength def self.shared_state(world, config) { var_active_jobs: shared(Variable::IVar, world, value: Set.new), var_final: shared(Variable::IVar, world), var_running: shared(Variable::MVar, world, value: config.jobs), var_sink: shared(Variable::IVar, world, value: config.sink), var_source: shared(Variable::IVar, world, value: config.source) } end private_class_method :shared_state def self.threads(world, config, workers) thread = world.thread workers.map do |worker| thread.new do thread.current.name = "#{config.thread_name}-#{worker.index}" worker.call end end end private_class_method :threads def self.shared(klass, world, **attributes) klass.new( condition_variable: world.condition_variable, mutex: world.mutex, **attributes ) end private_class_method :shared # Job result sink signature module Sink include AbstractType # Process job result # # @param [Response] # # @return [self] abstract_method :response # The sink status # # @return [Object] abstract_method :status # Test if processing should stop # # @return [Boolean] abstract_method :stop? end # Sink # Parallel run configuration class Config include Adamantium, Anima.new( :block, :jobs, :on_process_start, :process_name, :sink, :source, :thread_name, :timeout ) end # Config class Response include Anima.new(:error, :job, :log, :result) end # Parallel execution status class Status include Adamantium, Anima.new( :active_jobs, :done, :payload ) alias_method :done?, :done end # Status end # Parallel end # Mutant