lib/openwfe/engine/engine.rb in openwferu-0.9.13 vs lib/openwfe/engine/engine.rb in openwferu-0.9.14

- old
+ new

@@ -141,10 +141,11 @@ # If this method is called too soon, missing participants will # cause trouble... Call this method after all the participants # have been added. # def reschedule + get_expression_pool.reschedule() end alias :reload :reschedule @@ -156,10 +157,11 @@ # # Note that the launch method will raise those exceptions as well. # This method can be useful in some scenarii though. # def pre_launch_check (launchitem) + get_expression_pool.prepare_raw_expression(launchitem) end # # Launches a [business] process. @@ -172,36 +174,33 @@ # (Ruby process definition). # # Returns the FlowExpressionId instance of the expression at the # root of the newly launched process. # - def launch (launch_object) + # Options for scheduled launches like :at, :in and :cron are accepted + # via the 'options' optional parameter. + # For example : + # + # engine.launch(launch_item) + # # will launch immediately + # + # engine.launch(launch_item, :in => "1d20m") + # # will launch in one day and twenty minutes + # + # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007") + # # will launch at that point in time + # + # engine.launch(launch_item, :cron => "0 5 * * *") + # # will launch that same process every day, + # # five minutes after midnight (see "man 5 crontab") + # + def launch (launch_object, options={}) - launchitem = if launch_object.kind_of?(OpenWFE::LaunchItem) + launchitem = extract_launchitem launch_object - launch_object + fei = get_expression_pool.launch launchitem, options - elsif launch_object.kind_of?(Class) - - LaunchItem.new launch_object - - elsif launch_object.kind_of? String - - li = OpenWFE::LaunchItem.new - - if launch_object[0, 1] == '<' - li.workflowDefinitionUrl = "field:__definition" - li['definition'] = launch_object - else - li.workflowDefinitionUrl = launch_object - end - - li - end - - fei = get_expression_pool.launch launchitem - fei.dup # # so that users of this launch() method can play with their # fei without breaking things end @@ -229,10 +228,13 @@ "engine.reply() " + "cannot handle instances of #{workitem.class}" end end + alias :forward :reply + alias :proceed :reply + # # Registers a participant in this [embedded] engine. # This method is a shortcut to the ParticipantMap method # with the same name. # @@ -420,22 +422,22 @@ end t = Thread.new { Thread.stop } to = get_expression_pool.add_observer(:terminate) do |c, fe, wi| - t.wakeup if fe.fei.workflow_instance_id == wfid + t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?) end - te = get_expression_pool.add_observer(:error) do |c, fei, m, wi, se| - t.wakeup if fei.parent_wfid == wfid + te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| + t.wakeup if (fei.parent_wfid == wfid and t.alive?) end - ldebug { "wait_for() #{wfid}" } + linfo { "wait_for() #{wfid}" } t.join - get_expression_pool.remove_observer to, :terminate - get_expression_pool.remove_observer te, :error + get_expression_pool.remove_observer(to, :terminate) + get_expression_pool.remove_observer(te, :error) # # it would work as well without specifying the channel, # but it's thus a little bit faster end @@ -446,11 +448,11 @@ # A ProcessStatus is a description of the state of a process instance. # It enumerates the expressions where the process is currently # located (waiting certainly) and the errors the process currently # has (hopefully none). # - def get_process_status (wfid=nil) + def list_process_status (wfid=nil) wfid = to_wfid(wfid) if wfid result = {} @@ -473,10 +475,25 @@ end result end + # + # list_process_status() will be deprecated at release 1.0.0 + # + alias :get_process_status :list_process_status + + # + # Returns the process status of one given process instance. + # + def process_status (wfid) + + wfid = to_wfid(wfid) + + list_process_status(wfid).values[0] + end + #-- # METHODS FROM THE EXPRESSION POOL # # These methods are 'proxy' to method found in the expression pool. # They are made available here for a simpler model. @@ -544,10 +561,52 @@ def forget_expression (exp_or_fei) get_expression_pool.forget(exp_or_fei) end + # + # Pauses a process (sets its /__paused__ variable to true). + # + def pause_process (wfid) + + get_expression_pool.pause_process(wfid) + end + + # + # Restarts a process : removes its 'paused' flag (variable) and makes + # sure to 'replay' events (replies) that came for it while it was + # in pause. + # + def resume_process (wfid) + + get_expression_pool.resume_process(wfid) + end + + # + # Looks up a process variable in a process. + # If fei_or_wfid is not given, will simply look in the + # 'engine environment' (where the top level variables '//' do reside). + # + def lookup_variable (var_name, fei_or_wfid=nil) + + return get_expression_pool.fetch_engine_environment[var_name] \ + unless fei_or_wfid + + exp = if fei_or_wfid.is_a?(String) + + get_expression_pool.fetch_root(fei_or_wfid) + + else + + get_expression_pool.fetch_expression(fei_or_wfid) + end + + raise "no expression found for '#{fei_or_wfid.to_s}'" unless exp + + exp.lookup_variable var_name + end + protected #-- # the following methods may get overridden upon extension # see for example file_persisted_engine.rb @@ -631,10 +690,42 @@ def build_error_journal init_service(S_ERROR_JOURNAL, InMemoryErrorJournal) end + # + # Turns the raw launch request info into a LaunchItem instance. + # + def extract_launchitem (launch_object) + + if launch_object.kind_of?(OpenWFE::LaunchItem) + + launch_object + + elsif launch_object.kind_of?(Class) + + LaunchItem.new launch_object + + elsif launch_object.kind_of?(String) + + li = OpenWFE::LaunchItem.new + + #if launch_object[0, 1] == '<' or launch_object.match("\n") + if launch_object[0, 1] == '<' or launch_object.index("\n") + + li.workflow_definition_url = "field:__definition" + li['__definition'] = launch_object + + else + + li.workflow_definition_url = launch_object + end + + li + end + end + end # # ProcessStatus represents information about the status of a workflow # process instance. @@ -657,11 +748,11 @@ # concurrence, more than one expressions may be listed here. # attr_reader :expressions # - # a hash whose values are ProcessError instances, the keys + # A hash whose values are ProcessError instances, the keys # are FlowExpressionId instances (fei) (identifying the expressions # that are concerned with the error) # attr_reader :errors @@ -670,10 +761,19 @@ @expressions = [] @errors = {} end # + # Returns true if the process is in pause. + # + def paused? + + exp = @expressions[0] + exp != nil and exp.paused? + end + + # # this method is used by Engine.get_process_status() when # it prepares its results. # def << (item) @@ -693,11 +793,12 @@ s << " wfid : #{@wfid}\n" s << " expressions :\n" @expressions.each do |fexp| s << " #{fexp.fei}\n" end - s << " errors : #{@errors.size}" + s << " errors : #{@errors.size}\n" + s << " paused : #{paused?}" s end protected @@ -749,12 +850,12 @@ # puts engine.get_process_status.to_s # def pretty_print_process_status (ps) s = "" - s << "process_id | name | rev | brn | err\n" - s << "--------------------+-------------------+---------+-----+-----\n" + s << "process_id | name | rev | brn | err | paused? \n" + s << "--------------------+-------------------+---------+-----+-----+---------\n" ps.keys.sort.each do |wfid| status = ps[wfid] fexp = status.expressions[0] @@ -767,9 +868,11 @@ s << "%-7s" % ffei.workflow_definition_revision[0, 7] s << " | " s << "%3s" % status.expressions.size.to_s[0, 3] s << " | " s << "%3s" % status.errors.size.to_s[0, 3] + s << " | " + s << "%5s" % status.paused?.to_s s << "\n" end s end