Sha256: 013f04c3b30e50be616c4ac984fc91fcadae56799ec123568c3c0fe9cea7a16b

Contents?: true

Size: 1.92 KB

Versions: 40

Compression:

Stored size: 1.92 KB

Contents

module Dynflow
  class Director
    class SequentialManager
      attr_reader :execution_plan, :world

      def initialize(world, execution_plan)
        @world          = world
        @execution_plan = execution_plan
        @done           = false
      end

      def run
        with_state_updates do
          dispatch(execution_plan.run_flow)
          finalize
        end

        return execution_plan
      end

      def finalize
        reset_finalize_steps
        unless execution_plan.error?
          step_id = execution_plan.finalize_flow.all_step_ids.first
          action_class = execution_plan.steps[step_id].action_class
          world.middleware.execute(:finalize_phase, action_class, execution_plan) do
            dispatch(execution_plan.finalize_flow)
          end
        end
        @done = true
      end

      def reset_finalize_steps
        execution_plan.finalize_flow.all_step_ids.each do |step_id|
          step       = execution_plan.steps[step_id]
          step.state = :pending if [:success, :error].include? step.state
        end
      end

      def done?
        @done
      end

      private

      def dispatch(flow)
        case flow
        when Flows::Sequence
          run_in_sequence(flow.flows)
        when Flows::Concurrence
          run_in_concurrence(flow.flows)
        when Flows::Atom
          run_step(execution_plan.steps[flow.step_id])
        else
          raise ArgumentError, "Don't know how to run #{flow}"
        end
      end

      def run_in_sequence(steps)
        steps.all? { |s| dispatch(s) }
      end

      def run_in_concurrence(steps)
        run_in_sequence(steps)
      end

      def run_step(step)
        step.execute
        return step.state != :error
      end

      def with_state_updates(&block)
        execution_plan.update_state(:running)
        block.call
        execution_plan.update_state(execution_plan.error? ? :paused : :stopped)
      end
    end
  end
end

Version data entries

40 entries across 40 versions & 1 rubygems

Version Path
dynflow-1.3.0 lib/dynflow/director/sequential_manager.rb
dynflow-1.2.3 lib/dynflow/director/sequential_manager.rb
dynflow-1.2.2 lib/dynflow/director/sequential_manager.rb
dynflow-1.2.1 lib/dynflow/director/sequential_manager.rb
dynflow-1.2.0 lib/dynflow/director/sequential_manager.rb
dynflow-1.2.0.pre1 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.6 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.5 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.4 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.3 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.2 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.1 lib/dynflow/director/sequential_manager.rb
dynflow-1.1.0 lib/dynflow/director/sequential_manager.rb
dynflow-1.0.5 lib/dynflow/director/sequential_manager.rb
dynflow-1.0.4 lib/dynflow/director/sequential_manager.rb
dynflow-1.0.3 lib/dynflow/director/sequential_manager.rb
dynflow-1.0.2 lib/dynflow/director/sequential_manager.rb
dynflow-1.0.1 lib/dynflow/director/sequential_manager.rb
dynflow-1.0.0 lib/dynflow/director/sequential_manager.rb
dynflow-0.8.37 lib/dynflow/director/sequential_manager.rb