lib/dynflow/executors/parallel.rb in dynflow-0.7.9 vs lib/dynflow/executors/parallel.rb in dynflow-0.8.0

- old
+ new

@@ -34,37 +34,40 @@ end variants Work::Step, Work::Event, Work::Finalize end - PoolDone = Algebrick.type { fields! work: Work } - PoolTerminated = Algebrick.atom - WorkerDone = Algebrick.type { fields! work: Work, worker: Worker } - def initialize(world, pool_size = 10) super(world) - @core = Core.new world, pool_size + @core = Core.spawn name: 'parallel-executor-core', + args: [world, pool_size], + initialized: @core_initialized = Concurrent.future end - def execute(execution_plan_id, finished = Future.new) - @core.ask(Execution[execution_plan_id, finished]).value! + def execute(execution_plan_id, finished = Concurrent.future) + @core.ask([:handle_execution, execution_plan_id, finished]).value! finished + rescue Concurrent::Actor::ActorTerminated => error + dynflow_error = Dynflow::Error.new('executor terminated') + finished.fail dynflow_error unless finished.completed? + raise dynflow_error rescue => e - finished.fail e unless finished.ready? + finished.fail e unless finished.completed? raise e end - def event(execution_plan_id, step_id, event, future = Future.new) - @core << Event[execution_plan_id, step_id, event, future] + def event(execution_plan_id, step_id, event, future = Concurrent.future) + @core.ask([:handle_event, Event[execution_plan_id, step_id, event, future]]) future end - def terminate(future = Future.new) - @core.ask(MicroActor::Terminate, future) + def terminate(future = Concurrent.future) + @core.tell([:start_termination, future]) + future end def initialized - @core.initialized + @core_initialized end end end end