lib/dynflow/executors/parallel.rb in dynflow-0.8.16 vs lib/dynflow/executors/parallel.rb in dynflow-0.8.17

- old
+ new

@@ -1,52 +1,22 @@ module Dynflow module Executors class Parallel < Abstract - - require 'dynflow/executors/parallel/sequence_cursor' - require 'dynflow/executors/parallel/flow_manager' - require 'dynflow/executors/parallel/work_queue' - require 'dynflow/executors/parallel/execution_plan_manager' - require 'dynflow/executors/parallel/sequential_manager' - require 'dynflow/executors/parallel/running_steps_manager' require 'dynflow/executors/parallel/core' require 'dynflow/executors/parallel/pool' require 'dynflow/executors/parallel/worker' - UnprocessableEvent = Class.new(Dynflow::Error) - - Algebrick.type do |work| - Work = work - - Work::Finalize = type do - fields! sequential_manager: SequentialManager, - execution_plan_id: String - end - - Work::Step = type do - fields! step: ExecutionPlan::Steps::AbstractFlowStep, - execution_plan_id: String - end - - Work::Event = type do - fields! step: ExecutionPlan::Steps::AbstractFlowStep, - execution_plan_id: String, - event: Event - end - - variants Work::Step, Work::Event, Work::Finalize - end - def initialize(world, pool_size = 10) super(world) @core = Core.spawn name: 'parallel-executor-core', args: [world, pool_size], initialized: @core_initialized = Concurrent.future end - def execute(execution_plan_id, finished = Concurrent.future) - @core.ask([:handle_execution, execution_plan_id, finished]).value! + def execute(execution_plan_id, finished = Concurrent.future, wait_for_acceptance = true) + accepted = @core.ask([:handle_execution, execution_plan_id, finished]) + accepted.value! if wait_for_acceptance finished rescue Concurrent::Actor::ActorTerminated => error dynflow_error = Dynflow::Error.new('executor terminated') finished.fail dynflow_error unless finished.completed? raise dynflow_error @@ -54,10 +24,10 @@ finished.fail e unless finished.completed? raise e end def event(execution_plan_id, step_id, event, future = Concurrent.future) - @core.ask([:handle_event, Event[execution_plan_id, step_id, event, future]]) + @core.ask([:handle_event, Director::Event[execution_plan_id, step_id, event, future]]) future end def terminate(future = Concurrent.future) @core.tell([:start_termination, future])