module Eco module API class Session class Config class Workflow extend Eco::API::Common::ClassHierarchy WORKFLOW_MODEL = [ :options, {load: [{input: [:get, :filter]}, {people: [:get, :filter]}, :filter]}, :usecases, :launch_jobs, {post_launch: [:usecases, :launch_jobs]}, :report, :end, :close ] class << self def stages model_attrs end def model super || {} end def validate_stage(stage) "Unknown Workflow stage '#{stage}'. Should be any of #{stages}" unless stages.include?(stage) end def workflow_class(key) class_name = to_constant(key.to_s) new_class(class_name, inherits: Eco::API::Session::Config::Workflow) do |klass| klass.model = model[key] end end end self.model = WORKFLOW_MODEL attr_reader :config attr_reader :name def initialize(name = nil, _parent: self, config:) @config = config @name = name @stages = {} @_parent = _parent @pending = true # moments @on = nil @before = [] @after = [] end # Has this stage run yet? # @note it does **not** include _sub-stages_ that run `before` # @return [Boolean] `true` if it has run, `false` otherwise def pending? @pending end # Do not run this stage! def skip! @skip = true @pending = false end # Has this stage been marked as to be skipped # @return [Boolean] depends on this order: # - `true` if `skip!` was called # - `false` if the current _stage_ is `root?` (the top stage of the hierarchy) # - `true` if its parent task is to be skipped def skip? return @skip if instance_variable_defined?(:@skip) return false if root? @_parent.skip? end # Used in **configuration** time **to configure** the _workflow_ of the target (sub)stage `key` # @note if a `block` is provided it will `yield` the target stage immediately # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow] further _workflow_ configuration `for` the target stage `key` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def for(key = nil) raise "A block should be given." unless block_given? if !key yield(self) else stage(key).for(&Proc.new) end self end # Used in **configuration** time **to define** the **behaviour** the target (sub)stage `key` # @note if a `block` is provided it will **not** `yield` the target stage immediately, but when the _workflow_ reaches the stage # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow, io] the behaviour of the target stage `key` when the _workflow_ reaches it # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def on(key = nil, &block) raise "A block should be given." unless block if !key @on = block else stage(key).on(&block) end self end # When there is an `Exception`, you might have defined some `callback` to do something with it (i.e. register, email) # @yield [exception, io] the `callback` to do something with an `Exception` raised within this _workflow_ stage # @yieldparam exception [Exception] the exception object that was raised # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def rescue(&block) return @rescue unless block @rescue = block self end # Used in **configuration** time **add previous** `callbacks` **before** the `on` _callback_ of the (sub)stage `key` is actually `run` # @note # - it will **not** `yield` it immediately, but when the _workflow_ reaches the target stage # - in this case, you can define multiple `callbacks` # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow, io] one of the things to do **before** the `on` _callback_ of the (sub)stage `key` is actually `run` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] `io` the input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def before(key = nil, &block) raise "A block should be given." unless block if !key @before.push(block) else stage(key).before(&block) end self end # Used in **configuration** time **add previous** `callbacks` **after** the `on` _callback_ of the (sub)stage `key` is actually `run` # @note # - it will **not** `yield` it immediately, but when the _workflow_ reaches the target stage # - in this case, you can define multiple `callbacks` # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @yield [stage_workflow, io] one of the things to do **after** the `on` _callback_ of the (sub)stage `key` is actually `run` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] `io` the input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def after(key = nil, &block) raise "A block should be given." unless block if !key @after.push(block) else stage(key).after(&block) end self end # Used in run time to **execute the workflow** of the (sub)stage `key` # @note if a `block` is **not** provided: # - it will run the `before` _callbacks_ defined during the configuration time # - it will run the _workflow_ of any defined _**substage**_ of the `key` stage # - it will run the `on` _callback_ defined during the configuration time # - it will mark the stage as **not** `pending?`. # - it will run the `after` _callbacks_ defined during the configuration time # @note if a `block` is provided: # - it will **not** run the workflow of the substages to `key` stage # - it will **not** run the `callback` for `on` defined during the configuration time # - it will rather `yield` the target stage after all the `before` _callbacks_ have been run # - aside of this, the rest will be the same as when the _block_ is provided (see previous note) # @raise [ArgumentError] if the object returned by `before` and `after` callbacks is not an `Eco::API::UseCases::BaseIO`. # @param key [Symbol, nil] cases: # - if `key` is not provided, it targets the _current stage_ # - if `key` is provided, it targets the specific _sub-stage_ # @param io [Eco::API::UseCases::BaseIO] the input/output object # @yield [stage_workflow, io] if a `block` is provided, see `note` # @yieldparam stage_workflow [Eco::API::Session::Config::Workflow] the _target stage_ referred by `key` # @yieldparam io [Eco::API::UseCases::BaseIO] the input/output object carried througout all the _workflow_ # @yieldreturn [Eco::API::UseCases::BaseIO] the `io` input/output object carried througout all the _workflow_ # @return [Eco::API::Session::Config::Workflow] the current stage object (to ease chainig). def run(key = nil, io:, &block) raise "Missing BaseIO object" unless io.is_a?(Eco::API::UseCases::BaseIO) begin if key io = stage(key).run(io: io, &block) elsif pending? @before.each do |c| io = c.call(self, io).tap do |i_o| unless i_o.is_a?(Eco::API::UseCases::BaseIO) msg = "Workflow callaback before('#{name}') should return Eco::API::UseCases::BaseIO object." msg += " Given #{i_o.class}" msg += " • Callback source location: '#{c.source_location}'" raise ArgumentError.new(msg) end end end unless skip? io.session.logger.debug("(Workflow: #{path}) running now") if block io = block.call(self, io) else existing_stages.each {|stg| io = stg.run(io: io)} unless ready? msg = "(Workflow: #{path}) 'on' callback is not defined, nor block given" io.session.logger.debug(msg) end io = @on.call(self, io) if ready? end @pending = false end @after.each do |c| io = c.call(self, io).tap do |i_o| unless i_o.is_a?(Eco::API::UseCases::BaseIO) msg = "Workflow callaback after('#{name}') should return Eco::API::UseCases::BaseIO object." msg += " Given #{i_o.class}" msg += " • Callback source location: '#{c.source_location}'" raise ArgumentError.new(msg) end end end end rescue SystemExit exit rescue Interrupt => i raise rescue Exception => e self.rescue.call(e, io) if self.rescue raise end io end protected def path return name if root? "#{@_parent.path}:#{name}" end def root? @_parent == self end def ready? !!@on end def existing_stages # sort defined stages by stage order sorted_keys = self.class.stages & @stages.keys sorted_keys.map {|k| stage(k)} end def ready_stages exiting_stages.select {|s| s.ready?} end def stages_ready? existing_stages.all? {|s| s.ready?} end def stage(key) self.class.validate_stage(key) @stages[key] ||= self.class.workflow_class(key).new(key, _parent: self, config: config) end end end end end end