# frozen_string_literal: true module Dynflow # rubocop:disable Metrics/ClassLength class Action < Serializable OutputReference = ExecutionPlan::OutputReference include Algebrick::TypeCheck include Algebrick::Matching require 'dynflow/action/format' extend Action::Format require 'dynflow/action/progress' include Action::Progress require 'dynflow/action/rescue' include Action::Rescue require 'dynflow/action/suspended' require 'dynflow/action/missing' require 'dynflow/action/polling' require 'dynflow/action/cancellable' require 'dynflow/action/singleton' require 'dynflow/action/with_sub_plans' require 'dynflow/action/with_bulk_sub_plans' require 'dynflow/action/with_polling_sub_plans' require 'dynflow/action/v2' def self.all_children children.values.inject(children.values) do |children, child| children + child.all_children end end def self.inherited(child) children[child.name] = child child.inherit_execution_plan_hooks(execution_plan_hooks.dup) super child end def self.children @children ||= {} end def self.middleware @middleware ||= Middleware::Register.new end def self.execution_plan_hooks @execution_plan_hooks ||= ExecutionPlan::Hooks::Register.new end def self.inherit_execution_plan_hooks(hooks) @execution_plan_hooks = hooks end # FIND define subscriptions in world independent on action's classes, # limited only by in/output formats # @return [nil, Class] a child of Action def self.subscribe nil end ERROR = Object.new SUSPEND = Object.new Skip = Algebrick.atom Phase = Algebrick.type do Executable = type do variants Plan = atom, Run = atom, Finalize = atom end variants Executable, Present = atom end module Executable def execute_method_name match self, (on Plan, :execute_plan), (on Run, :execute_run), (on Finalize, :execute_finalize) end end module Phase def to_s_humanized to_s.split('::').last end end DelayedEvent = Algebrick.type do fields! execution_plan_id: String, step_id: Integer, event: Object, time: type { variants Time, NilClass }, optional: Algebrick::Types::Boolean end def self.constantize(action_name) super action_name rescue NameError Action::Missing.generate(action_name) end attr_reader :world, :phase, :execution_plan_id, :id, :input, :plan_step_id, :run_step_id, :finalize_step_id, :caller_execution_plan_id, :caller_action_id, :pending_output_chunks middleware.use Action::Progress::Calculate def initialize(attributes, world) Type! attributes, Hash @phase = Type! attributes.fetch(:phase), Phase @world = Type! world, World @step = Type! attributes.fetch(:step, nil), ExecutionPlan::Steps::Abstract, NilClass raise ArgumentError, 'Step reference missing' if phase?(Executable) && @step.nil? @execution_plan_id = Type! attributes.fetch(:execution_plan_id), String @id = Type! attributes.fetch(:id), Integer @plan_step_id = Type! attributes.fetch(:plan_step_id), Integer @run_step_id = Type! attributes.fetch(:run_step_id), Integer, NilClass @finalize_step_id = Type! attributes.fetch(:finalize_step_id), Integer, NilClass @execution_plan = Type!(attributes.fetch(:execution_plan), ExecutionPlan) if phase? Present @caller_execution_plan_id = Type!(attributes.fetch(:caller_execution_plan_id, nil), String, NilClass) @caller_action_id = Type!(attributes.fetch(:caller_action_id, nil), Integer, NilClass) getter = ->key, required do required ? attributes.fetch(key) : attributes.fetch(key, {}) end @input = OutputReference.deserialize getter.(:input, phase?(Run, Finalize, Present)) @output = OutputReference.deserialize getter.(:output, false) if phase? Run, Finalize, Present @pending_output_chunks = [] if phase? Run, Finalize end def phase?(*phases) Match? phase, *phases end def phase!(*phases) phase?(*phases) or raise TypeError, "Wrong phase #{phase}, required #{phases}" end def label self.class.name end def input=(hash) Type! hash, Hash phase! Plan @input = Utils.indifferent_hash(hash) end def output=(hash) Type! hash, Hash phase! Run @output = Utils.indifferent_hash(hash) end def output if phase? Plan @output_reference or raise 'plan_self has to be invoked before being able to reference the output' else @output end end def output_chunk(chunk, kind: nil, timestamp: Time.now) @pending_output_chunks << { chunk: chunk, kind: kind, timestamp: timestamp } end def stored_output_chunks @output_chunks ||= world.persistence.load_output_chunks(@execution_plan_id, @id) end def drop_output_chunks! @pending_output_chunks = [] @output_chunks = [] world.persistence.delete_output_chunks(@execution_plan_id, @id) end def caller_action phase! Present return nil if @caller_action_id return @caller_action if @caller_action caller_execution_plan = if @caller_execution_plan_id.nil? execution_plan else world.persistence.load_execution_plan(@caller_execution_plan_id) end @caller_action = world.persistence.load_action_for_presentation(caller_execution_plan, @caller_action_id) end def set_plan_context(execution_plan, triggering_action, from_subscription) phase! Plan @execution_plan = Type! execution_plan, ExecutionPlan @triggering_action = Type! triggering_action, Action, NilClass @from_subscription = Type! from_subscription, TrueClass, FalseClass end # action that caused this action to be planned. Available only in planning phase def triggering_action phase! Plan @triggering_action end def from_subscription? phase! Plan @from_subscription end def execution_plan phase! Plan, Present @execution_plan end def action_logger world.action_logger end def plan_step phase! Present execution_plan.steps.fetch(plan_step_id) end # @param [Class] filter_class return only actions which are kind of `filter_class` # @return [Array] of directly planned actions by this action, # returned actions are in Present phase def planned_actions(filter = Action) phase! Present plan_step .planned_steps(execution_plan) .map { |s| s.action(execution_plan) } .select { |a| a.is_a?(filter) } end # @param [Class] filter_class return only actions which are kind of `filter_class` # @return [Array] of all (including indirectly) planned actions by this action, # returned actions are in Present phase def all_planned_actions(filter_class = Action) phase! Present mine = planned_actions (mine + mine.reduce([]) { |arr, action| arr + action.all_planned_actions }) .select { |a| a.is_a?(filter_class) } end def run_step phase! Present execution_plan.steps.fetch(run_step_id) if run_step_id end def finalize_step phase! Present execution_plan.steps.fetch(finalize_step_id) if finalize_step_id end def steps [plan_step, run_step, finalize_step] end def to_hash recursive_to_hash( { class: self.class.name, execution_plan_id: execution_plan_id, id: id, plan_step_id: plan_step_id, run_step_id: run_step_id, finalize_step_id: finalize_step_id, caller_execution_plan_id: caller_execution_plan_id, caller_action_id: caller_action_id, input: input }, if phase? Run, Finalize, Present { output: output } end ) end def state raise "state data not available" if @step.nil? @step.state end # @override to define more descriptive state information for the # action: used in Dynflow console def humanized_state state.to_s end def error raise "error data not available" if @step.nil? @step.error end def execute(*args) phase! Executable self.send phase.execute_method_name, *args end # @api private # @return [Array] - ids of steps referenced from action def required_step_ids(input = self.input) results = [] recursion = ->value do case value when Hash value.values.each { |v| recursion.(v) } when Array value.each { |v| recursion.(v) } when ExecutionPlan::OutputReference results << value.step_id else # no reference hidden in this arg end results end recursion.(input) end def execute_delay(delay_options, *args) with_error_handling(true) do world.middleware.execute(:delay, self, delay_options, *args) do |*new_args| @serializer = delay(*new_args).tap do |serializer| serializer.perform_serialization! end end end end def serializer raise "The action must be delayed in order to access the serializer" if @serializer.nil? @serializer end def holds_singleton_lock? false end # @override define what pool should the action be run in. The # queue defined here will also be used as the default queue for # all the steps planned under this action, unless overrided by sub-action def queue end # Plan an +event+ to be send to the action defined by +action+, what defaults to be self. # if +time+ is not passed, event is sent as soon as possible. def plan_event(event, time = nil, execution_plan_id: self.execution_plan_id, step_id: self.run_step_id, optional: false) time = @world.clock.current_time + time if time.is_a?(Numeric) delayed_events << DelayedEvent[execution_plan_id, step_id, event, time, optional] end def delayed_events @delayed_events ||= [] end protected def state=(state) phase! Executable @world.logger.debug format('%13s %s:%2d %9s >> %9s in phase %8s %s', 'Step', execution_plan_id, @step.id, self.state, state, phase.to_s_humanized, self.class) @step.state = state end # If this save returns an integer, it means it was an update. The number # represents the number of updated records. If it is 0, then the step was in # an unexpected state and couldn't be updated def save_state(conditions = {}) phase! Executable @step.save(conditions) end def delay(delay_options, *args) Serializers::Noop.new(args) end # @override to implement the action's *Plan phase* behaviour. # By default it plans itself and expects input-hash. # Use #plan_self and #plan_action methods to plan actions. # It can use DB in this phase. def plan(*args) if from_subscription? # if the action is triggered by subscription, by default use the # input of parent action. # should be replaced by referencing the input from input format plan_self(input.merge(triggering_action.input)) else # in this case, the action was triggered by plan_action. Use # the argument specified there. plan_self(*args) end self end # Add this method to implement the action's *Run phase* behaviour. # It should not use DB in this phase. def run(event = nil) # just a rdoc placeholder end remove_method :run # Add this method to implement the action's *Finalize phase* behaviour. # It can use DB in this phase. def finalize # just a rdoc placeholder end remove_method :finalize def run_accepts_events? method(:run).arity != 0 end def self.new_from_hash(hash, world) hash.delete(:output) if hash[:output].nil? unless hash[:execution_plan_uuid].nil? hash[:execution_plan_id] = hash[:execution_plan_uuid] end new(hash, world) end private # DSL for plan phase def concurrence(&block) phase! Plan @execution_plan.switch_flow(Flows::Concurrence.new([]), &block) end def sequence(&block) phase! Plan @execution_plan.switch_flow(Flows::Sequence.new([]), &block) end def plan_self(input = {}) phase! Plan self.input.update input if self.respond_to?(:run) run_step = @execution_plan.add_run_step(self) @run_step_id = run_step.id @output_reference = OutputReference.new(@execution_plan.id, run_step.id, id) end if self.respond_to?(:finalize) finalize_step = @execution_plan.add_finalize_step(self) @finalize_step_id = finalize_step.id end return self # to stay consistent with plan_action end def plan_action(action_class, *args) phase! Plan @execution_plan.add_plan_step(action_class, self).execute(@execution_plan, self, false, *args) end # DSL for run phase def suspended_action phase! Run @suspended_action ||= Action::Suspended.new(self) end def suspend(&block) phase! Run block.call suspended_action if block throw SUSPEND, SUSPEND end # DSL to terminate action execution and set it to error def error!(error) phase! Executable set_error(error) throw ERROR end def with_error_handling(propagate_error = nil, &block) raise "wrong state #{self.state}" unless [:scheduling, :skipping, :running].include?(self.state) begin catch(ERROR) { block.call } rescue Exception => error set_error(error) # reraise low-level exceptions raise error unless Type? error, StandardError, ScriptError end case self.state when :scheduling self.state = :pending when :running self.state = :success when :skipping self.state = :skipped when :suspended, :error # Do nothing else raise "wrong state #{self.state}" end if propagate_error && self.state == :error raise(@step.error.exception) end end def set_error(error) phase! Executable Type! error, Exception, String action_logger.error error self.state = :error @step.error = ExecutionPlan::Steps::Error.new(error) end def execute_plan(*args) phase! Plan self.state = :running save_state # when the error occurred inside the planning, catch that # before getting out of the planning phase with_error_handling(!root_action?) do concurrence do world.middleware.execute(:plan, self, *args) do |*new_args| plan(*new_args) end end subscribed_actions = world.subscribed_actions(self.class) if subscribed_actions.any? # we encapsulate the flow for this action into a concurrence and # add the subscribed flows to it as well. trigger_flow = @execution_plan.current_run_flow.sub_flows.pop @execution_plan.switch_flow(Flows::Concurrence.new([trigger_flow].compact)) do subscribed_actions.each do |action_class| new_plan_step = @execution_plan.add_plan_step(action_class, self) new_plan_step.execute(@execution_plan, self, true, *args) end end end check_serializable :input end end # TODO: This is getting out of hand, refactoring needed def execute_run(event) phase! Run @world.logger.debug format('%13s %s:%2d got event %s', 'Step', execution_plan_id, @step.id, event) if event case when state == :running raise NotImplementedError, 'recovery after restart is not implemented' when [:pending, :error, :skipping, :suspended].include?(state) if event && state != :suspended raise 'event can be processed only when in suspended state' end old_state = self.state self.state = :running unless self.state == :skipping saved = save_state(:state => %w(pending error skipping suspended)) if saved.kind_of?(Integer) && !saved.positive? # The step was already in a state we're trying to transition to, most # likely we were about to execute it for the second time after first # execution was forcefully interrupted. # Set error and return to prevent the step from being executed twice set_error "Could not transition step from #{old_state} to #{self.state}, step already in #{self.state}." return end @input = OutputReference.dereference @input, world.persistence with_error_handling do event = Skip if state == :skipping # we run the Skip event only when the run accepts events if event != Skip || run_accepts_events? result = catch(SUSPEND) do world.middleware.execute(:run, self, *[event].compact) do |*args| run(*args) end end self.state = :suspended if result == SUSPEND end check_serializable :output end else raise "wrong state #{state} when event:#{event}" end end def execute_finalize phase! Finalize @input = OutputReference.dereference @input, world.persistence self.state = :running save_state with_error_handling do world.middleware.execute(:finalize, self) do finalize end end end def check_serializable(what) Match! what, :input, :output value = send what recursive_to_hash value # it raises when not serializable rescue => e value.replace not_serializable: true raise e end def root_action? # in planning phase, the @triggered_action can be used to check whether the is root (the main action used # to create the execution plan). # For post-planning phases, the action is in root when either: # - the @caller_action_id is not set OR # - the @caller_action_id is set but the @caller_execution_plan_id is set as well # which means, the @caller_action_id is actually referencing different execution plan # and this action is creating a new execution plan, that's tracked as sub-plan # for the @caller_execution_plan_id @triggering_action.nil? && (@caller_action_id.nil? || @caller_execution_plan_id) end # An action must be a singleton and have a singleton lock def self.singleton_locked?(world) if self.ancestors.include? ::Dynflow::Action::Singleton lock_class = ::Dynflow::Coordinator::SingletonActionLock world.coordinator.find_locks(lock_class.unique_filter(self.name)).any? else false end end end # rubocop:enable Metrics/ClassLength end