lib/openwfe/expool/expressionpool.rb in openwferu-0.9.9 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.10
- old
+ new
@@ -49,10 +49,11 @@
require 'openwfe/logging'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
require 'openwfe/util/lru'
require 'openwfe/util/safe'
+require 'openwfe/util/workqueue'
require 'openwfe/util/observable'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw_xml'
require 'openwfe/expressions/raw_prog'
require 'openwfe/expressions/simplerep'
@@ -128,11 +129,12 @@
class ExpressionPool
include \
ServiceMixin,
MonitorMixin,
OwfeServiceLocator,
- Observable
+ Observable,
+ WorkqueueMixin
SAFETY_LEVEL = 2
#
# code loaded from a remote URI will get evaluated with
# that security level
@@ -146,17 +148,20 @@
@monitors = MonitorProvider.new(application_context)
@observers = {}
@stopped = false
+
+ start_workqueue
end
#
# Makes sure to call the do_stop() method of the Stoppable mixin
#
def stop
@stopped = true
+ stop_workqueue
onotify :stop
end
#
# Obtains a unique monitor for an expression.
@@ -202,49 +207,38 @@
end
#
# Instantiates a workflow definition and launches it.
#
- # If async is set to true, the launch will occur in its own thread.
+ # This method call will return immediately, it could even return
+ # before the actual launch is completely over.
#
# Returns the FlowExpressionId instance of the root expression of
# the newly launched flow.
- # If async is set to true, returns a tuple FlowExpressionId /
- # Thread instance used.
#
- def launch (launchitem, async=false)
+ def launch (launchitem)
- raw_expression = prepare_raw_expression(launchitem)
+ raw_expression = prepare_raw_expression launchitem
#
# will raise an exception if there are requirements
# and one of them is not met
raw_expression.new_environment()
#
# as this expression is the root of a new process instance,
# it has to have an environment for all the variables of
# the process instance
- wi = build_workitem(launchitem)
+ wi = build_workitem launchitem
fei = raw_expression.fei
- onotify :launch, launchitem.workflow_definition_url
+ onotify :launch, fei, launchitem
- if async
+ apply raw_expression, wi
- t = OpenWFE::call_in_thread "launch()", self do
- apply(raw_expression, wi)
- end
-
- return fei, t
- end
-
- apply(raw_expression, wi)
-
- return fei
- #return fei, Thread.current
+ fei
end
#
# Prepares a raw expression from a template.
# Returns that raw expression.
@@ -264,14 +258,10 @@
end
#raise "did not find expression at #{template.to_s}" \
# unless rawexp
- ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" }
-
- onotify :launch_template, rawexp.fei
-
rawexp = rawexp.dup()
rawexp.fei = rawexp.fei.dup()
if requesting_expression == nil
@@ -320,10 +310,12 @@
rawexp = prepare_from_template(
requesting_expression, sub_id, template, params)
workitem.flow_expression_id = rawexp.fei
+ onotify :launch_template, rawexp.fei, workitem
+
apply rawexp, workitem
rawexp.fei
end
@@ -347,30 +339,21 @@
#
# Applies a given expression (id or expression)
#
def apply (exp, workitem)
- exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId
+ #do_apply exp, workitem
+ queue_work :do_apply, exp, workitem
+ end
- #ldebug { "apply() '#{fei}' (#{fei.class})" }
+ #
+ # Replies to a given expression
+ #
+ def reply (exp, workitem)
- if not exp
- lwarn { "apply() cannot apply missing #{fei.to_debug_s}" }
- return
- end
-
- #ldebug { "apply() #{fei.to_debug_s}" }
-
- #exp.apply_time = OpenWFE::now()
- #
- # this is done in RawExpression
-
- workitem.flow_expression_id = exp.fei
-
- onotify :apply, exp.fei, workitem
-
- exp.apply(workitem)
+ #do_reply exp, workitem
+ queue_work :do_reply, exp, workitem
end
#
# Cancels the given expression.
# The param might be an expression instance or a FlowExpressionId
@@ -385,11 +368,11 @@
return nil
end
ldebug { "cancel() for #{fei.to_debug_s}" }
- onotify :cancel, fei
+ onotify :cancel, exp
inflowitem = exp.cancel()
remove(exp)
inflowitem
@@ -399,11 +382,11 @@
# Given any expression of a process, cancels the complete process
# instance.
#
def cancel_flow (exp_or_wfid)
- ldebug { "cancel_flow() from #{exp_or_wfid}" }
+ #ldebug { "cancel_flow() from #{exp_or_wfid}" }
root = fetch_root(exp_or_wfid)
cancel(root)
end
alias :cancel_process :cancel_flow
@@ -418,11 +401,11 @@
#ldebug { "forget() forgetting #{fei}" }
return if not exp
- onotify :forget, fei
+ onotify :forget, exp
parent_exp.children.delete(fei)
exp.parent_id = GONE_PARENT_ID
exp.dup_environment
@@ -438,11 +421,11 @@
ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
workitem.last_expression_id = exp.fei
- onotify :reply_to_parent, exp.fei, workitem
+ onotify :reply_to_parent, exp, workitem
if remove
remove(exp)
#
@@ -463,16 +446,17 @@
#
# flow terminated ?
if not exp.parent_id
+
ldebug do
"reply_to_parent() process " +
"#{exp.fei.workflow_instance_id} terminated"
end
- onotify :terminate, exp.fei
+ onotify :terminate, exp, workitem
return
end
#
@@ -492,31 +476,10 @@
reply exp.parent_id, workitem
end
#
- # Triggers the reply expression of the expression given by its id.
- #
- def reply (exp, workitem)
-
- exp, fei = fetch(exp)
-
- ldebug { "reply() to #{fei.to_debug_s}" }
- ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" }
-
- if not exp
- #raise "cannot reply to missing #{fei.to_debug_s}"
- lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" }
- return
- end
-
- onotify :reply, fei, workitem
-
- exp.reply(workitem)
- end
-
- #
# Adds or updates a flow expression in this pool
#
def update (flow_expression)
#ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
@@ -752,12 +715,62 @@
list_workflows(false, wfid)[0]
end
protected
- #def evaluate_definition (raw_definition, workitem)
- # expression = raw_definition.instantiate(workitem)
- #end
+ def do_process_workelement elt
+
+ message, fei, workitem = elt
+ send message, fei, workitem
+ end
+
+ #
+ # The real apply work.
+ #
+ def do_apply (exp, workitem)
+
+ exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId
+
+ #ldebug { "apply() '#{fei}' (#{fei.class})" }
+
+ if not exp
+ lwarn { "apply() cannot apply missing #{fei.to_debug_s}" }
+ return
+ end
+
+ #ldebug { "apply() #{fei.to_debug_s}" }
+
+ #exp.apply_time = OpenWFE::now()
+ #
+ # this is done in RawExpression
+
+ workitem.flow_expression_id = exp.fei
+
+ onotify :apply, exp, workitem
+
+ exp.apply(workitem)
+ end
+
+ #
+ # The real reply work is done here
+ #
+ def do_reply (exp, workitem)
+
+ exp, fei = fetch(exp)
+
+ ldebug { "reply() to #{fei.to_debug_s}" }
+ ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" }
+
+ if not exp
+ #raise "cannot reply to missing #{fei.to_debug_s}"
+ lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" }
+ return
+ end
+
+ onotify :reply, exp, workitem
+
+ exp.reply(workitem)
+ end
#
# Removes an environment, especially takes care of unbinding
# any special value it may contain.
#