Sha256: 67cab059eaa162a70f20a714a81385e99f7c7a69422229d48c1d607967dcb7b5

Contents?: true

Size: 1.94 KB

Versions: 9

Compression:

Stored size: 1.94 KB

Contents

module Dynflow
  module Executors
    class Parallel::SequentialManager
      attr_reader :execution_plan, :world

      def initialize(world, execution_plan)
        @world          = world
        @execution_plan = execution_plan
        @done           = false
      end

      def run
        with_state_updates do
          dispatch(execution_plan.run_flow)
          finalize
        end

        return execution_plan
      end

      def finalize
        reset_finalize_steps
        unless execution_plan.error?
          step_id = execution_plan.finalize_flow.all_step_ids.first
          action_class = execution_plan.steps[step_id].action_class
          world.middleware.execute(:finalize_phase, action_class, execution_plan) do
            dispatch(execution_plan.finalize_flow)
          end
        end
        @done = true
      end

      def reset_finalize_steps
        execution_plan.finalize_flow.all_step_ids.each do |step_id|
          step       = execution_plan.steps[step_id]
          step.state = :pending if [:success, :error].include? step.state
        end
      end

      def done?
        @done
      end

      private

      def dispatch(flow)
        case flow
        when Flows::Sequence
          run_in_sequence(flow.flows)
        when Flows::Concurrence
          run_in_concurrence(flow.flows)
        when Flows::Atom
          run_step(execution_plan.steps[flow.step_id])
        else
          raise ArgumentError, "Don't know how to run #{flow}"
        end
      end

      def run_in_sequence(steps)
        steps.all? { |s| dispatch(s) }
      end

      def run_in_concurrence(steps)
        run_in_sequence(steps)
      end

      def run_step(step)
        step.execute
        return step.state != :error
      end

      def with_state_updates(&block)
        execution_plan.update_state(:running)
        block.call
        execution_plan.update_state(execution_plan.error? ? :paused : :stopped)
      end
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
dynflow-0.8.16 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.15 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.14 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.13 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.12 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.11 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.10 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.9 lib/dynflow/executors/parallel/sequential_manager.rb
dynflow-0.8.8 lib/dynflow/executors/parallel/sequential_manager.rb