module Eco module API class Session class Config class Workflow extend Eco::API::Common::ClassHierarchy WORKFLOW_MODEL = [ :options, {load: [{input: [:get, :filter]}, {people: [:get, :filter]}, :filter]}, :usecases, :launch_jobs, {post_launch: [:usecases, :launch_jobs]}, :report, :end, :close ] class << self def stages model_attrs end def model super || {} end def validate_stage(stage) "Unknown Workflow stage '#{stage}'. Should be any of #{stages}" unless stages.include?(stage) end def workflow_class(key) class_name = to_constant(key.to_s) new_class(class_name, inherits: Eco::API::Session::Config::Workflow) do |klass| klass.model = model[key] end end end self.model = WORKFLOW_MODEL attr_reader :config def initialize(name = nil, _parent: self, config:) @config = config @name = name @stages = {} @_parent = _parent @pending = true # moments @on = nil @before = [] @after = [] end def name(with_path: false) return @name if !with_path || root? [@_parent.name(with_path: true), @name].compact.join('.') end # Has this stage run yet? # @note it does **not** include _sub-stages_ that run `before` # @return [Boolean] `true` if it has run, `false` otherwise def pending? @pending end # Do not run this stage! def skip! @skip = true @pending = false end # Has this stage been marked as to be skipped # @return [Boolean] depends on this order: # - `true` if `skip!` was called # - `false` if the current _stage_ is `root?` (the top stage of the hierarchy) # - `true` if its parent task is to be skipped def skip? return @skip if instance_variable_defined?(:@skip) return false if root? @_parent.skip? end # Used in **configuration** time **to configure** the _workflow_ of the target (sub)stage `key` # @note # 1. if a `block` is provided it will `yield` the target stage immediately # 2. a `block` is only required when `key` has not been specified. # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow] further _workflow_ configuration `for` the target stage `key` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @return [Eco::API::Session::Config::Workflow] # 1. if block is provided provided, it returns the **current stage** object (to ease chainig). # 2. if block is not provided, it returns the **stage** referred by `key` def for(key = nil, &block) raise ArgumentError, "With no 'key', a block should be given." unless key || block_given? self.tap do next yield(self) unless key next stage(key).for(&block) if block_given? return stage(key) end end alias_method :with, :for # Used in **configuration** time **to define** the **behaviour** the target (sub)stage `key` # @note if a `block` is provided it will **not** `yield` the target stage immediately, but when the _workflow_ reaches the stage # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow, io] the behaviour of the target stage `key` when the _workflow_ reaches it # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def on(key = nil, &block) raise ArgumentError, "A block should be given." unless block_given? if !key @on = block else stage(key).on(&block) end self end # When there is an `Exception`, you might have defined some `callback` to do something with it (i.e. register, email) # @yield [exception, io] the `callback` to do something with an `Exception` raised within this _workflow_ stage # @yieldparam exception [Exception] the exception object that was raised # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def rescue(&block) return @rescue unless block @rescue = block self end # Prevent reserved word clash on DSL alias_method :exception, :rescue # Called on `SystemExit` exception def exit_handle(&block) return @exit_handle unless block @exit_handle = block self end # Used in **configuration** time **add previous** `callbacks` **before** the `on` _callback_ of the (sub)stage `key` is actually `run` # @note # - it will **not** `yield` it immediately, but when the _workflow_ reaches the target stage # - in this case, you can define multiple `callbacks` # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow, io] one of the things to do **before** the `on` _callback_ of the (sub)stage `key` is actually `run` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] `io` the input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def before(key = nil, &block) raise ArgumentError, "A block should be given." unless block_given? if !key @before.push(block) else stage(key).before(&block) end self end # Used in **configuration** time **add previous** `callbacks` **after** the `on` _callback_ of the (sub)stage `key` is actually `run` # @note # - it will **not** `yield` it immediately, but when the _workflow_ reaches the target stage # - in this case, you can define multiple `callbacks` # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow, io] one of the things to do **after** the `on` _callback_ of the (sub)stage `key` is actually `run` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] `io` the input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def after(key = nil, &block) raise ArgumentError, "A block should be given." unless block_given? if !key @after.push(block) else stage(key).after(&block) end self end # Used in run time to **execute the workflow** of the (sub)stage `key` # @note if a `block` is **not** provided: # - it will run the `before` _callbacks_ defined during the configuration time # - it will run the _workflow_ of any defined _**substage**_ of the `key` stage # - it will run the `on` _callback_ defined during the configuration time # - it will mark the stage as **not** `pending?`. # - it will run the `after` _callbacks_ defined during the configuration time # @note if a `block` is provided: # - it will **not** run the workflow of the substages to `key` stage # - it will **not** run the `callback` for `on` defined during the configuration time # - it will rather `yield` the target stage after all the `before` _callbacks_ have been run # - aside of this, the rest will be the same as when the _block_ is provided (see previous note) # @note [if the object returned by `before`, `after` and `run` callbacks is not an `Eco::API::UseCases::BaseIO`, # the original `io` object will be returned instead. # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @param io [Eco::API::UseCases::BaseIO] the input/output object # @yield [stage_workflow, io] if a `block` is provided, see `note` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def run(key = nil, io:, &block) raise "Missing BaseIO object" unless io.is_a?(Eco::API::UseCases::BaseIO) if key io = io_result(io: io) do stage(key).run(io: io, &block) end elsif pending? @before.each do |c| io = io_result(io: io) do io.evaluate(self, io, &c) end end unless skip? io.session.logger.debug("(Workflow: #{path}) running now") if block io = io_result(io: io) do io.evaluate(self, io, &block) end else existing_stages.each do |stg| io = io_result(io: io) do stg.run(io: io) end end unless ready? msg = "(Workflow: #{path}) 'on' callback is not defined, nor block given" io.session.logger.debug(msg) end io = io_result(io: io) do io.evaluate(self, io, &@on) end end @pending = false end @after.each do |c| io = io_result(io: io) do io.evaluate(self, io, &c) end end end io rescue SystemExit => e io = io_result(io: io) do io.evaluate(e, io, &self.exit_handle) end exit e.status rescue Interrupt => i raise rescue Exception => e # raise unless self.rescue io = io_result(io: io) do io.evaluate(e, io, &self.rescue) end raise ensure @pending = false end protected def root? @_parent == self end # It ensures an `Eco::API::UseCases::BaseIO` is returned # @note it always brings the IO to be a `BaseIO`. # As the `output` and the type is captured, it just cleans usecase related # logic from the object. # @raise ArgumentError if `klass` isn't child of `Eco::API::UseCases::BaseIO` # @return [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout the _workflow_ def io_result(io:, klass: Eco::API::UseCases::BaseIO) msg = "Expecting class to be child of Eco::API::UseCases::BaseIO. Given: #{klass}" raise ErrorArgument, msg unless klass <= Eco::API::UseCases::BaseIO result = yield io = result if result.is_a?(klass) io.instance_of?(klass) ? io : io.base end def path return name if root? "#{@_parent.path}:#{name}" end def root? @_parent == self end def ready? !!@on end def existing_stages # sort defined stages by stage order sorted_keys = self.class.stages & @stages.keys sorted_keys.map {|k| stage(k)} end def ready_stages exiting_stages.select {|s| s.ready?} end def stages_ready? existing_stages.all? {|s| s.ready?} end def stage(key) self.class.validate_stage(key) @stages[key] ||= self.class.workflow_class(key).new(key, _parent: self, config: config) end end end end end end