lib/openwfe/expool/expressionpool.rb in openwferu-0.9.4 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.5
- old
+ new
@@ -47,12 +47,11 @@
require 'openwfe/utils'
require 'openwfe/service'
require 'openwfe/logging'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
-require 'openwfe/util/stoppable'
-require 'openwfe/util/lru_cache'
+require 'openwfe/util/lru'
require 'openwfe/util/observable'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw_xml'
include OpenWFE
@@ -70,11 +69,11 @@
MAX_MONITORS = 10000
def initialize (application_context=nil)
super()
@application_context = application_context
- @monitors = LRUCache.new(MAX_MONITORS)
+ @monitors = LruHash.new(MAX_MONITORS)
end
def [] (key)
synchronize do
#ldebug { "[] caller :\n" + OpenWFE::caller_to_s(8) }
@@ -125,11 +124,10 @@
class ExpressionPool
include \
ServiceMixin,
MonitorMixin,
OwfeServiceLocator,
- Stoppable,
Observable
@@last_given_instance_id = -1
#
# storing at class level the last workflow instance id given
@@ -142,20 +140,18 @@
@monitors = MonitorProvider.new(application_context)
@observers = {}
- reschedule_a_bit_later
+ @stopped = false
end
#
# Makes sure to call the do_stop() method of the Stoppable mixin
#
def stop
- # would an alias be better ?
- do_stop
-
+ @stopped = true
onotify :stop
end
#
# Obtains a unique monitor for an expression.
@@ -167,28 +163,46 @@
end
#
# Instantiates a workflow definition and launches it.
#
- def launch (launchitem)
+ # If async is set to true, the launch will occur in its own thread.
+ #
+ # Returns the FlowExpressionId instance of the root expression of
+ # the newly launched flow.
+ # If async is set to true, returns a tuple FlowExpressionId /
+ # Thread instance used.
+ #
+ def launch (launchitem, async=false)
onotify :launch, launchitem.workflow_definition_url
- rawExpression = buildRawExpression(launchitem)
+ raw_expression = buildRawExpression(launchitem)
wi = build_workitem(launchitem)
- rawExpression.apply(wi)
+ fei = raw_expression.fei
- return wi.flow_expression_id
+ if async
+
+ t = OpenWFE::call_in_thread "launch()", self do
+ raw_expression.apply(wi)
+ end
+
+ return fei, t
+ end
+
+ raw_expression.apply(wi)
+
+ return fei
end
#
# launches a subprocess
#
- def launch_template \
- (requesting_expression, template, workitem, params=nil)
+ def launch_template (
+ requesting_expression, sub_id, template, workitem, params=nil)
#ldebug { "launch_template() of class #{template.class}" }
rawexp = nil
@@ -203,22 +217,32 @@
onotify :launch_template, rawexp.fei
rawexp = rawexp.dup()
rawexp.fei = rawexp.fei.dup()
- if requesting_expression.kind_of? FlowExpressionId
+ if requesting_expression == nil
+
+ rawexp.parent_id = nil
+ rawexp.fei.workflow_instance_id = new_workflow_instance_id()
+
+ elsif requesting_expression.kind_of? FlowExpressionId
+
rawexp.parent_id = requesting_expression
rawexp.fei.workflow_instance_id = \
- "#{requesting_expression.workflow_instance_id}.0"
+ "#{requesting_expression.workflow_instance_id}.#{sub_id}"
+
elsif requesting_expression.kind_of? String
+
rawexp.parent_id = nil
rawexp.fei.workflow_instance_id = \
- "#{requesting_expression}.0"
+ "#{requesting_expression}.${sub_id}"
+
else # kind is FlowExpression
+
rawexp.parent_id = requesting_expression.fei
rawexp.fei.workflow_instance_id = \
- "#{requesting_expression.fei.workflow_instance_id}.0"
+ "#{requesting_expression.fei.workflow_instance_id}.#{sub_id}"
end
#ldebug do
# p = ""
# p = rawexp.parent_id.to_debug_s if rawexp.parent_id
@@ -240,25 +264,31 @@
rawexp.store_itself()
workitem.flow_expression_id = rawexp.fei
+ fei = rawexp.fei
+
rawexp.apply(workitem)
- # why not : launch in a thread and reply immediately
-
- return workitem.flow_expression_id
+ return fei
end
#
# Evaluates a raw definition expression and
# returns its body fei
#
def evaluate (rawExpression, workitem)
+
exp = rawExpression.instantiate_real_expression(workitem)
fei = exp.evaluate(workitem)
- remove(rawExpression)
+
+ #remove(rawExpression)
+ #
+ # not necessary, the raw expression gets overriden by
+ # the real expression
+
return fei
end
#
# Applies a given expression (id or expression)
@@ -304,10 +334,23 @@
return inflowitem
end
#
+ # Given any expression of a process, cancels the complete process.
+ #
+ def cancel_flow (exp)
+
+ ldebug { "cancel_flow() from #{exp}" }
+
+ root = fetch_root(exp)
+ cancel(root)
+ end
+
+ alias :cancel_process :cancel_flow
+
+ #
# Forgets the given expression (makes sure to substitute its
# parent_id with the GONE_PARENT_ID constant)
#
def forget (exp)
@@ -324,17 +367,16 @@
#
# Replies to the parent of the given expression.
#
def reply_to_parent (exp, workitem)
- exp, fei = fetch(exp)
+ ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
- workitem.last_expression_id = fei
+ workitem.last_expression_id = exp.fei
- onotify :reply_to_parent, fei, workitem
+ onotify :reply_to_parent, exp.fei, workitem
- #remove(exp, workitem)
remove(exp)
#
# remove all the children of the expression
@@ -381,13 +423,19 @@
#
# Adds or updates a flow expression in this pool
#
def update (flowExpression)
+ t = Timer.new
+
onotify :update, flowExpression.fei, flowExpression
- get_expression_storage()[flowExpression.fei] = flowExpression
+ #get_expression_storage()[flowExpression.fei] = flowExpression
+
+ ldebug { "update() took #{t.duration} ms" }
+
+ return flowExpression
end
#
# Fetches a FlowExpression from the pool.
# Returns a tuple : the FlowExpression plus its FlowExpressionId.
@@ -397,10 +445,13 @@
#
def fetch (exp)
synchronize do
fei = exp
+
+ #ldebug { "fetch() exp is of kind #{exp.class}" }
+
if exp.kind_of? FlowExpression
fei = exp.fei
elsif not exp.kind_of? FlowExpressionId
raise \
"Cannot fetch expression with key : "+
@@ -422,10 +473,23 @@
def fetch_expression (exp)
exp, _fei = fetch(exp)
return exp
end
+ #
+ # Fetches the root expression of a process (given any of its
+ # expressions).
+ #
+ def fetch_root (exp)
+ exp = fetch_expression(exp)
+ return exp unless exp.parent_id
+ return fetch_root(fetch_expression(exp.parent_id))
+ end
+
+ #
+ # Returns the engine environment (the top level environment)
+ #
def fetch_engine_environment ()
synchronize do
eei = engine_environment_id
ee, fei = fetch(eei)
@@ -444,26 +508,30 @@
# Removes a flow expression from the pool
# (This method is mainly called from the pool itself)
#
def remove (exp)
- exp, fei = fetch(exp)
+ #exp, fei = fetch(exp)
+ if exp.kind_of? FlowExpressionId
+ exp, _fei = fetch(exp)
+ end
+
return if not exp
- ldebug { "remove() fe #{fei.to_debug_s}" }
+ ldebug { "remove() fe #{exp.fei.to_debug_s}" }
- onotify :remove, fei
+ onotify :remove, exp.fei
synchronize do
- @monitors.delete(fei)
+ @monitors.delete(exp.fei)
- #get_expression_storage().remove(fei, workitem)
- get_expression_storage().delete(fei)
+ #get_expression_storage().delete(fei)
if exp.owns_its_environment?
+
remove_environment(exp.environment_id)
end
end
end
@@ -472,23 +540,16 @@
# This method is called at each expool (engine) [re]start.
# It roams through the previously saved (persisted) expressions
# to reschedule ones like 'sleep' or 'cron'.
#
def reschedule
+
+ return if @stopped
+
synchronize do
- #if is_stopped?
- # linfo { "reschedule() skipped as expool is stopped" }
- # return
- #end
- #if get_scheduler.is_stopped?
- # linfo do
- # "reschedule() skipped as scheduler "+
- # "#{get_scheduler.object_id} is stopped"
- # end
- # return
- #end
+ t = OpenWFE::Timer.new
ldebug { "reschedule() initiating..." }
get_expression_storage.each_of_kind(Schedulable) do |fe|
@@ -497,11 +558,11 @@
onotify :reschedule, fe.fei
fe.reschedule(get_scheduler)
end
- ldebug { "reschedule() done." }
+ ldebug { "reschedule() done. (took #{t.duration} ms)" }
end
end
#
# Returns the unique engine_environment FlowExpressionId instance.
@@ -525,29 +586,10 @@
end
end
protected
- def reschedule_a_bit_later
- Thread.new do
- #
- # Just leaving some time for the initialize() to finish
- # and let the expression pool get registered in
- # the application context
- #
- begin
- sleep(0.001)
- reschedule()
- rescue
- lwarn do
- "reschedule() failed\n"+
- OpenWFE::exception_to_s($!)
- end
- end
- end
- end
-
def evaluate_definition (raw_definition, workitem)
expression = raw_definition.instantiate(workitem)
end
def remove_environment (environment_id)
@@ -556,11 +598,13 @@
env, fei = fetch(environment_id)
env.unbind()
- get_expression_storage().delete(environment_id)
+ #get_expression_storage().delete(environment_id)
+
+ onotify :remove, environment_id
end
def build_workitem (launchitem)
wi = InFlowWorkItem.new()
@@ -611,11 +655,11 @@
return sDefinition.make()
end
if sDefinition.kind_of? Class
- return sDefinition.do_make(get_expression_map)
+ return sDefinition.do_make()
end
raise \
"Cannot deduce process definition " +
"out of instance of class #{sDefinition.class}"
@@ -638,12 +682,17 @@
end
def new_workflow_instance_id ()
synchronize do
+
wfid = OpenWFE::current_time_millis()
- wfid = wfid + 1 if wfid == @@last_given_instance_id
+
+ wfid = @@last_given_instance_id + 1 \
+ if wfid <= @@last_given_instance_id
+
@@last_given_instance_id = wfid
+
return wfid.to_s
end
end
#