lib/dynflow/executors/parallel.rb in dynflow-0.8.16 vs lib/dynflow/executors/parallel.rb in dynflow-0.8.17
- old
+ new
@@ -1,52 +1,22 @@
module Dynflow
module Executors
class Parallel < Abstract
-
- require 'dynflow/executors/parallel/sequence_cursor'
- require 'dynflow/executors/parallel/flow_manager'
- require 'dynflow/executors/parallel/work_queue'
- require 'dynflow/executors/parallel/execution_plan_manager'
- require 'dynflow/executors/parallel/sequential_manager'
- require 'dynflow/executors/parallel/running_steps_manager'
require 'dynflow/executors/parallel/core'
require 'dynflow/executors/parallel/pool'
require 'dynflow/executors/parallel/worker'
- UnprocessableEvent = Class.new(Dynflow::Error)
-
- Algebrick.type do |work|
- Work = work
-
- Work::Finalize = type do
- fields! sequential_manager: SequentialManager,
- execution_plan_id: String
- end
-
- Work::Step = type do
- fields! step: ExecutionPlan::Steps::AbstractFlowStep,
- execution_plan_id: String
- end
-
- Work::Event = type do
- fields! step: ExecutionPlan::Steps::AbstractFlowStep,
- execution_plan_id: String,
- event: Event
- end
-
- variants Work::Step, Work::Event, Work::Finalize
- end
-
def initialize(world, pool_size = 10)
super(world)
@core = Core.spawn name: 'parallel-executor-core',
args: [world, pool_size],
initialized: @core_initialized = Concurrent.future
end
- def execute(execution_plan_id, finished = Concurrent.future)
- @core.ask([:handle_execution, execution_plan_id, finished]).value!
+ def execute(execution_plan_id, finished = Concurrent.future, wait_for_acceptance = true)
+ accepted = @core.ask([:handle_execution, execution_plan_id, finished])
+ accepted.value! if wait_for_acceptance
finished
rescue Concurrent::Actor::ActorTerminated => error
dynflow_error = Dynflow::Error.new('executor terminated')
finished.fail dynflow_error unless finished.completed?
raise dynflow_error
@@ -54,10 +24,10 @@
finished.fail e unless finished.completed?
raise e
end
def event(execution_plan_id, step_id, event, future = Concurrent.future)
- @core.ask([:handle_event, Event[execution_plan_id, step_id, event, future]])
+ @core.ask([:handle_event, Director::Event[execution_plan_id, step_id, event, future]])
future
end
def terminate(future = Concurrent.future)
@core.tell([:start_termination, future])