lib/openwfe/expool/expressionpool.rb in openwferu-0.9.9 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.10

- old
+ new

@@ -49,10 +49,11 @@ require 'openwfe/logging' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/lru' require 'openwfe/util/safe' +require 'openwfe/util/workqueue' require 'openwfe/util/observable' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' require 'openwfe/expressions/raw_prog' require 'openwfe/expressions/simplerep' @@ -128,11 +129,12 @@ class ExpressionPool include \ ServiceMixin, MonitorMixin, OwfeServiceLocator, - Observable + Observable, + WorkqueueMixin SAFETY_LEVEL = 2 # # code loaded from a remote URI will get evaluated with # that security level @@ -146,17 +148,20 @@ @monitors = MonitorProvider.new(application_context) @observers = {} @stopped = false + + start_workqueue end # # Makes sure to call the do_stop() method of the Stoppable mixin # def stop @stopped = true + stop_workqueue onotify :stop end # # Obtains a unique monitor for an expression. @@ -202,49 +207,38 @@ end # # Instantiates a workflow definition and launches it. # - # If async is set to true, the launch will occur in its own thread. + # This method call will return immediately, it could even return + # before the actual launch is completely over. # # Returns the FlowExpressionId instance of the root expression of # the newly launched flow. - # If async is set to true, returns a tuple FlowExpressionId / - # Thread instance used. # - def launch (launchitem, async=false) + def launch (launchitem) - raw_expression = prepare_raw_expression(launchitem) + raw_expression = prepare_raw_expression launchitem # # will raise an exception if there are requirements # and one of them is not met raw_expression.new_environment() # # as this expression is the root of a new process instance, # it has to have an environment for all the variables of # the process instance - wi = build_workitem(launchitem) + wi = build_workitem launchitem fei = raw_expression.fei - onotify :launch, launchitem.workflow_definition_url + onotify :launch, fei, launchitem - if async + apply raw_expression, wi - t = OpenWFE::call_in_thread "launch()", self do - apply(raw_expression, wi) - end - - return fei, t - end - - apply(raw_expression, wi) - - return fei - #return fei, Thread.current + fei end # # Prepares a raw expression from a template. # Returns that raw expression. @@ -264,14 +258,10 @@ end #raise "did not find expression at #{template.to_s}" \ # unless rawexp - ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" } - - onotify :launch_template, rawexp.fei - rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression == nil @@ -320,10 +310,12 @@ rawexp = prepare_from_template( requesting_expression, sub_id, template, params) workitem.flow_expression_id = rawexp.fei + onotify :launch_template, rawexp.fei, workitem + apply rawexp, workitem rawexp.fei end @@ -347,30 +339,21 @@ # # Applies a given expression (id or expression) # def apply (exp, workitem) - exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId + #do_apply exp, workitem + queue_work :do_apply, exp, workitem + end - #ldebug { "apply() '#{fei}' (#{fei.class})" } + # + # Replies to a given expression + # + def reply (exp, workitem) - if not exp - lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } - return - end - - #ldebug { "apply() #{fei.to_debug_s}" } - - #exp.apply_time = OpenWFE::now() - # - # this is done in RawExpression - - workitem.flow_expression_id = exp.fei - - onotify :apply, exp.fei, workitem - - exp.apply(workitem) + #do_reply exp, workitem + queue_work :do_reply, exp, workitem end # # Cancels the given expression. # The param might be an expression instance or a FlowExpressionId @@ -385,11 +368,11 @@ return nil end ldebug { "cancel() for #{fei.to_debug_s}" } - onotify :cancel, fei + onotify :cancel, exp inflowitem = exp.cancel() remove(exp) inflowitem @@ -399,11 +382,11 @@ # Given any expression of a process, cancels the complete process # instance. # def cancel_flow (exp_or_wfid) - ldebug { "cancel_flow() from #{exp_or_wfid}" } + #ldebug { "cancel_flow() from #{exp_or_wfid}" } root = fetch_root(exp_or_wfid) cancel(root) end alias :cancel_process :cancel_flow @@ -418,11 +401,11 @@ #ldebug { "forget() forgetting #{fei}" } return if not exp - onotify :forget, fei + onotify :forget, exp parent_exp.children.delete(fei) exp.parent_id = GONE_PARENT_ID exp.dup_environment @@ -438,11 +421,11 @@ ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } workitem.last_expression_id = exp.fei - onotify :reply_to_parent, exp.fei, workitem + onotify :reply_to_parent, exp, workitem if remove remove(exp) # @@ -463,16 +446,17 @@ # # flow terminated ? if not exp.parent_id + ldebug do "reply_to_parent() process " + "#{exp.fei.workflow_instance_id} terminated" end - onotify :terminate, exp.fei + onotify :terminate, exp, workitem return end # @@ -492,31 +476,10 @@ reply exp.parent_id, workitem end # - # Triggers the reply expression of the expression given by its id. - # - def reply (exp, workitem) - - exp, fei = fetch(exp) - - ldebug { "reply() to #{fei.to_debug_s}" } - ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" } - - if not exp - #raise "cannot reply to missing #{fei.to_debug_s}" - lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } - return - end - - onotify :reply, fei, workitem - - exp.reply(workitem) - end - - # # Adds or updates a flow expression in this pool # def update (flow_expression) #ldebug { "update() for #{flow_expression.fei.to_debug_s}" } @@ -752,12 +715,62 @@ list_workflows(false, wfid)[0] end protected - #def evaluate_definition (raw_definition, workitem) - # expression = raw_definition.instantiate(workitem) - #end + def do_process_workelement elt + + message, fei, workitem = elt + send message, fei, workitem + end + + # + # The real apply work. + # + def do_apply (exp, workitem) + + exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId + + #ldebug { "apply() '#{fei}' (#{fei.class})" } + + if not exp + lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } + return + end + + #ldebug { "apply() #{fei.to_debug_s}" } + + #exp.apply_time = OpenWFE::now() + # + # this is done in RawExpression + + workitem.flow_expression_id = exp.fei + + onotify :apply, exp, workitem + + exp.apply(workitem) + end + + # + # The real reply work is done here + # + def do_reply (exp, workitem) + + exp, fei = fetch(exp) + + ldebug { "reply() to #{fei.to_debug_s}" } + ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" } + + if not exp + #raise "cannot reply to missing #{fei.to_debug_s}" + lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } + return + end + + onotify :reply, exp, workitem + + exp.reply(workitem) + end # # Removes an environment, especially takes care of unbinding # any special value it may contain. #