lib/dynflow/executors/parallel.rb in dynflow-0.7.9 vs lib/dynflow/executors/parallel.rb in dynflow-0.8.0
- old
+ new
@@ -34,37 +34,40 @@
end
variants Work::Step, Work::Event, Work::Finalize
end
- PoolDone = Algebrick.type { fields! work: Work }
- PoolTerminated = Algebrick.atom
- WorkerDone = Algebrick.type { fields! work: Work, worker: Worker }
-
def initialize(world, pool_size = 10)
super(world)
- @core = Core.new world, pool_size
+ @core = Core.spawn name: 'parallel-executor-core',
+ args: [world, pool_size],
+ initialized: @core_initialized = Concurrent.future
end
- def execute(execution_plan_id, finished = Future.new)
- @core.ask(Execution[execution_plan_id, finished]).value!
+ def execute(execution_plan_id, finished = Concurrent.future)
+ @core.ask([:handle_execution, execution_plan_id, finished]).value!
finished
+ rescue Concurrent::Actor::ActorTerminated => error
+ dynflow_error = Dynflow::Error.new('executor terminated')
+ finished.fail dynflow_error unless finished.completed?
+ raise dynflow_error
rescue => e
- finished.fail e unless finished.ready?
+ finished.fail e unless finished.completed?
raise e
end
- def event(execution_plan_id, step_id, event, future = Future.new)
- @core << Event[execution_plan_id, step_id, event, future]
+ def event(execution_plan_id, step_id, event, future = Concurrent.future)
+ @core.ask([:handle_event, Event[execution_plan_id, step_id, event, future]])
future
end
- def terminate(future = Future.new)
- @core.ask(MicroActor::Terminate, future)
+ def terminate(future = Concurrent.future)
+ @core.tell([:start_termination, future])
+ future
end
def initialized
- @core.initialized
+ @core_initialized
end
end
end
end