lib/openwfe/engine/engine.rb in openwferu-0.9.11 vs lib/openwfe/engine/engine.rb in openwferu-0.9.12

- old
+ new

@@ -41,19 +41,22 @@ # require 'logger' require 'fileutils' -require 'openwfe/workitem' +require 'openwfe/omixins' require 'openwfe/rudefinitions' require 'openwfe/service' +require 'openwfe/workitem' require 'openwfe/util/irb' require 'openwfe/util/scheduler' require 'openwfe/util/schedulers' require 'openwfe/expool/wfidgen' require 'openwfe/expool/expressionpool' require 'openwfe/expool/expstorage' +require 'openwfe/expool/errorjournal' +require 'openwfe/expressions/environment' require 'openwfe/expressions/expressionmap' require 'openwfe/participants/participantmap' module OpenWFE @@ -62,10 +65,11 @@ # The simplest implementation of the OpenWFE workflow engine. # No persistence is used, everything is stored in memory. # class Engine < Service include OwfeServiceLocator + include FeiMixin # # Builds an OpenWFEru engine. # # Accepts an optional initial application_context (containing @@ -80,29 +84,57 @@ super(S_ENGINE, application_context) $OWFE_LOG = application_context[:logger] unless $OWFE_LOG + #puts "Creating logs in " + FileUtils.pwd FileUtils.mkdir("logs") unless File.exist?("logs") - $OWFE_LOG = Logger.new("logs/openwferu.log") + $OWFE_LOG = Logger.new("logs/openwferu.log", 10, 1024000) + $OWFE_LOG.level = Logger::INFO end # build order matters. # # especially for the expstorage which 'observes' the expression # pool and thus needs to be instantiated after it. build_scheduler() + # + # for delayed or repetitive executions (it's the engine's clock) + # see http://openwferu.rubyforge.org/scheduler.html build_expression_map() + # + # mapping expression names ('sequence', 'if', 'concurrence', + # 'when'...) to their implementations (SequenceExpression, + # IfExpression, ConcurrenceExpression, ...) build_wfid_generator() + # + # the workflow instance (process instance) id generator + # making sure each process instance has a unique identifier + build_expression_pool() + # + # the core (hairy ball) of the engine + build_expression_storage() + # + # the engine persistence (persisting the expression instances + # that make up process instances) build_participant_map() + # + # building the services that maps participant names to + # participant implementations / instances. + build_error_journal() + # + # builds the error journal (keeping track of failures + # in business process executions, and an opportunity to + # fix and replay) + linfo { "new() --- engine started --- #{self.object_id}" } end # # Call this method once the participant for a persisted engine @@ -134,10 +166,14 @@ # Launches a [business] process. # The 'launch_object' param may contain either a LaunchItem instance, # either a String containing the URL of the process definition # to launch (with an empty LaunchItem created on the fly). # + # The launch object can also be a String containing the XML process + # definition or directly a class extending OpenWFE::ProcessDefinition + # (Ruby process definition). + # # Returns the FlowExpressionId instance of the expression at the # root of the newly launched process. # def launch (launch_object) @@ -161,11 +197,16 @@ end li end - get_expression_pool.launch launchitem + fei = get_expression_pool.launch launchitem + + fei.dup + # + # so that users of this launch() method can play with their + # fei without breaking things end # # This method is used to feed a workitem back to the engine (after # it got sent to a worklist or wherever by a participant). @@ -291,10 +332,25 @@ get_scheduler.join end # + # Calling this method makes the control flow block until the + # workflow engine is inactive. + # + # TODO : implement idle_for + # + def join_until_idle () + + storage = get_expression_storage + + while storage.size > 1 + sleep 1 + end + end + + # # Enabling the console means that hitting CTRL-C on the window / # term / dos box / whatever does run the OpenWFEru engine will # open an IRB interactive console for directly manipulating the # engine instance. # @@ -344,129 +400,176 @@ nil end # + # Waits for a given process instance to terminate. + # The method only exits when the flow terminates, but beware : if + # the process already terminated, the method will never exit. + # + # The parameter can be a FlowExpressionId instance, for example the + # one given back by a launch(), or directly a workflow instance id + # (String). + # + # This method is mainly used in utests. + # + def wait_for (fei_or_wfid) + + wfid = if fei_or_wfid.kind_of?(FlowExpressionId) + fei_or_wfid.workflow_instance_id + else + fei_or_wfid + end + + #Thread.pass + # # + # # let the flow 'stabilize' or progress before enquiring + #fexp = get_expression_pool.fetch_expression(fei) + #return unless fexp + # + # doesn't work well + + t = Thread.new { Thread.stop } + + get_expression_pool.add_observer(:terminate) do |channel, fe, wi| + t.wakeup if fe.fei.workflow_instance_id == wfid + end + + ldebug { "wait_for() #{wfid}" } + + t.join + end + + # + # Returns a hash of ProcessStatus instances. The keys of the hash + # are workflow instance ids. + # + # A ProcessStatus is a description of the state of a process instance + # it enumerates the expressions where the process is currently + # located (waiting certainly) and the errors the process currently + # has (hopefully none). + # + def get_process_status (wfid=nil) + + wfid = to_wfid(wfid) if wfid + + result = {} + + get_expression_storage.real_each(wfid) do |fei, fexp| + next if fexp.kind_of?(Environment) + next unless fexp.apply_time + (result[fei.parent_wfid] ||= ProcessStatus.new) << fexp + end + + result.values.each do |ps| + get_error_journal.get_error_log(ps.wfid).each do |error| + ps << error + end + end + + class << result + def to_s + pretty_print_process_status(self) + end + end + + result + end + + #-- # METHODS FROM THE EXPRESSION POOL # # These methods are 'proxy' to method found in the expression pool. # They are made available here for a simpler model. - # + #++ # # Returns the list of applied expressions belonging to a given # workflow instance. # May be used to determine where a process instance currently is. # - def get_flow_position (workflow_instance_id) - get_expression_pool.get_flow_position(workflow_instance_id) + # This method returns all the expressions (the stack) a process + # went through to reach its current state. + # + def get_process_stack (workflow_instance_id) + + get_expression_pool.get_process_stack(workflow_instance_id) end - alias :get_process_position :get_flow_position + alias :get_flow_stack :get_process_stack # - # Lists all workflows (processes) currently in the expool (in + # Lists all workflow (process) instances currently in the expool (in # the engine). # This method will return a list of "process-definition" expressions # (root of flows). # - # If consider_subprocesses is set to true, "process-definition" - # expressions of subprocesses will be returned as well. - # # "wfid_prefix" allows your to query for specific workflow instance # id prefixes. # - def list_workflows (consider_subprocesses=false, wfid_prefix=nil) + # If consider_subprocesses is set to true, "process-definition" + # expressions of subprocesses will be returned as well. + # + def list_processes (consider_subprocesses=false, wfid_prefix=nil) - get_expression_pool.list_workflows( + get_expression_pool.list_processes( consider_subprocesses, wfid_prefix) end - alias :list_processes :list_workflows + alias :list_workflows :list_processes # # Given any expression of a process, cancels the complete process # instance. # - def cancel_flow (exp_or_wfid) - get_expression_pool.cancel_flow(exp_or_wfid) + def cancel_process (exp_or_wfid) + + get_expression_pool.cancel_process(exp_or_wfid) end - alias :cancel_process :cancel_flow + alias :cancel_flow :cancel_process # # Cancels the given expression (and its children if any) # (warning : advanced method) # + # Cancelling the root expression of a process is equivalent to + # cancelling the process. + # def cancel_expression (exp_or_fei) - get_expression_pool.cancel(exp_or_fei) + + get_expression_pool.cancel_expression(exp_or_fei) end # # Forgets the given expression (make it an orphan) # (warning : advanced method) # def forget_expression (exp_or_fei) + get_expression_pool.forget(exp_or_fei) end - # - # Waits for a given process instance to terminate. - # The method only exits when the flow terminates, but beware : if - # the process already terminated, the method will never exit. - # - # The parameter can be a FlowExpressionId instance, for example the - # one given back by a launch(), or directly a workflow instance id - # (String). - # - # This method is mainly used in utests. - # - def wait_for (fei_or_wfid) - - wfid = if fei_or_wfid.kind_of?(FlowExpressionId) - fei_or_wfid.workflow_instance_id - else - fei_or_wfid - end - - #Thread.pass - # # - # # let the flow 'stabilize' or progress before enquiring - #fexp = get_expression_pool.fetch_expression(fei) - #return unless fexp - # - # doesn't work well - - t = Thread.new { Thread.stop } - - get_expression_pool.add_observer(:terminate) do |channel, fe, wi| - t.wakeup if fe.fei.workflow_instance_id == wfid - end - - ldebug { "wait_for() #{wfid}" } - - t.join - end - protected - # + #-- # the following methods may get overridden upon extension # see for example file_persisted_engine.rb - # + #++ def build_expression_map @application_context[S_EXPRESSION_MAP] = ExpressionMap.new # # the expression map is not a Service anymore, # it's a simple instance (that will be reused in other # OpenWFEru components) - - #ldebug do - # "build_expression_map() :\n" + - # get_expression_map.to_s - #end end + # + # This implementation builds a KotobaWfidGenerator instance and + # binds it in the engine context. + # There are other WfidGeneration implementations available, like + # UuidWfidGenerator or FieldWfidGenerator. + # def build_wfid_generator #init_service S_WFID_GENERATOR, DefaultWfidGenerator #init_service S_WFID_GENERATOR, UuidWfidGenerator init_service S_WFID_GENERATOR, KotobaWfidGenerator @@ -477,29 +580,199 @@ # showing how to initialize a FieldWfidGenerator that # will take as workflow instance id the value found in # the field "wfid" of the LaunchItem. end + # + # Builds the OpenWFEru expression pool (the core of the engine) + # and binds it in the engine context. + # There is only one implementation of the expression pool, so + # this method is usually never overriden. + # def build_expression_pool init_service(S_EXPRESSION_POOL, ExpressionPool) end + # + # The implementation here builds an InMemoryExpressionStorage + # instance. + # + # See FilePersistedEngine or CachedFilePersistedEngine for + # overrides of this method. + # def build_expression_storage init_service(S_EXPRESSION_STORAGE, InMemoryExpressionStorage) end + # + # The ParticipantMap is a mapping between participant names + # (well rather regular expressions) and participant implementations + # (see http://openwferu.rubyforge.org/participants.html) + # def build_participant_map init_service(S_PARTICIPANT_MAP, ParticipantMap) end + # + # There is only one Scheduler implementation, that's the one + # built and bound here. + # def build_scheduler init_service(S_SCHEDULER, SchedulerService) end + # + # The default implementation of this method uses an + # InMemoryErrorJournal (do not use in production). + # + def build_error_journal + + init_service(S_ERROR_JOURNAL, InMemoryErrorJournal) + end + + end + + # + # ProcessStatus gathers information about the status of a business + # process instance. + # + # The status is mainly a list of expressions and a hash of errors. + # + # Instances of this class are obtained via Engine.process_status(). + # + class ProcessStatus + + # + # the String workflow instance id of the Process. + # + attr_reader :wfid + + # + # The list of the expressions currently active in the process instance. + # + # For instance, if your process definition is currently in a + # concurrence, more than one expressions may be listed here. + # + attr_reader :expressions + + # + # a hash whose values are ProcessError instances, the keys + # are FlowExpressionId instances (fei) (identifying the expressions + # that are concerned with the error) + # + attr_reader :errors + + def initialize + @wfid = nil + @expressions = [] + @errors = {} + end + + # + # this method is used by Engine.get_process_status() when + # it prepares its results. + # + def << (item) + + if item.kind_of?(FlowExpression) + add_expression item + else + add_error item + end + end + + # + # A String representation, handy for debugging, quick viewing. + # + def to_s + s = "" + s << "-- #{self.class.name} --\n" + s << " wfid : #{@wfid}\n" + s << " expressions :\n" + @expressions.each do |fexp| + s << " #{fexp.fei}\n" + end + s << " errors : #{@errors.size}" + s + end + + protected + + def add_expression (fexp) + + set_wfid fexp.fei.parent_wfid + + #@expressions << fexp + + exps = @expressions + @expressions = [] + + added = false + @expressions = exps.collect do |fe| + if added or fe.fei.wfid != fexp.fei.wfid + fe + else + if OpenWFE::starts_with(fexp.fei.expid, fe.fei.expid) + added = true + fexp + elsif OpenWFE::starts_with(fe.fei.expid, fexp.fei.expid) + added = true + fe + else + fe + end + end + end + @expressions << fexp unless added + end + + def add_error (error) + @errors[error.fei] = error + end + + def set_wfid (wfid) + + return if @wfid + @wfid = wfid + end + end + + # + # Renders a nice, terminal oriented, representation of an + # Engine.get_process_status() result. + # + # You usually directly benefit from this when doing + # + # puts engine.get_process_status.to_s + # + def pretty_print_process_status (ps) + + s = "" + s << "process_id | name | rev | brn | err\n" + s << "--------------------+-------------------+---------+-----+-----\n" + + ps.keys.sort.each do |wfid| + + status = ps[wfid] + fexp = status.expressions[0] + ffei = fexp.fei + + s << "%-19s" % wfid[0, 19] + s << " | " + s << "%-17s" % ffei.workflow_definition_name[0, 17] + s << " | " + s << "%-7s" % ffei.workflow_definition_revision[0, 7] + s << " | " + s << "%3s" % status.expressions.size.to_s[0, 3] + s << " | " + s << "%3s" % status.errors.size.to_s[0, 3] + s << "\n" + end + s end end