lib/openwfe/expool/expressionpool.rb in openwferu-0.9.3 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.4
- old
+ new
@@ -49,10 +49,11 @@
require 'openwfe/logging'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
require 'openwfe/util/stoppable'
require 'openwfe/util/lru_cache'
+require 'openwfe/util/observable'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw_xml'
include OpenWFE
@@ -120,11 +121,16 @@
# It's the core of the workflow engine.
# It relies on an expression storage for actual persistence of the
# expressions.
#
class ExpressionPool
- include ServiceMixin, MonitorMixin, OwfeServiceLocator, Stoppable
+ include \
+ ServiceMixin,
+ MonitorMixin,
+ OwfeServiceLocator,
+ Stoppable,
+ Observable
@@last_given_instance_id = -1
#
# storing at class level the last workflow instance id given
@@ -134,19 +140,23 @@
service_init(service_name, application_context)
@monitors = MonitorProvider.new(application_context)
+ @observers = {}
+
reschedule_a_bit_later
end
#
# Makes sure to call the do_stop() method of the Stoppable mixin
#
def stop
# would an alias be better ?
do_stop
+
+ onotify :stop
end
#
# Obtains a unique monitor for an expression.
# It avoids the need for the FlowExpression instances to include
@@ -159,10 +169,12 @@
#
# Instantiates a workflow definition and launches it.
#
def launch (launchitem)
+ onotify :launch, launchitem.workflow_definition_url
+
rawExpression = buildRawExpression(launchitem)
wi = build_workitem(launchitem)
rawExpression.apply(wi)
@@ -186,10 +198,12 @@
rawexp = template
end
ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" }
+ onotify :launch_template, rawexp.fei
+
rawexp = rawexp.dup()
rawexp.fei = rawexp.fei.dup()
if requesting_expression.kind_of? FlowExpressionId
rawexp.parent_id = requesting_expression
@@ -224,12 +238,16 @@
#
# the new scope gets its own environment
rawexp.store_itself()
+ workitem.flow_expression_id = rawexp.fei
+
rawexp.apply(workitem)
+ # why not : launch in a thread and reply immediately
+
return workitem.flow_expression_id
end
#
# Evaluates a raw definition expression and
@@ -247,21 +265,23 @@
#
def apply (exp, workitem)
exp, fei = fetch(exp)
- ldebug { "apply() '#{fei}' (#{fei.class})" }
+ #ldebug { "apply() '#{fei}' (#{fei.class})" }
if not exp
lwarn { "apply() cannot apply missing #{fei.to_debug_s}" }
return
end
- ldebug { "apply() #{fei.to_debug_s}" }
+ #ldebug { "apply() #{fei.to_debug_s}" }
- workitem.last_expression_id = exp.fei
+ onotify :apply, fei, workitem
+ workitem.flow_expression_id = exp.fei
+
exp.apply(workitem)
end
#
# Cancels the given expression
@@ -275,10 +295,12 @@
return nil
end
ldebug { "cancel() for #{fei.to_debug_s}" }
+ onotify :cancel, fei
+
inflowitem = exp.cancel()
remove(exp)
return inflowitem
end
@@ -291,10 +313,12 @@
exp, fei = fetch(exp)
return if not exp
+ onotify :forget, fei
+
exp.parent_id = GONE_PARENT_ID
exp.store_itself()
end
#
@@ -304,10 +328,12 @@
exp, fei = fetch(exp)
workitem.last_expression_id = fei
+ onotify :reply_to_parent, fei, workitem
+
#remove(exp, workitem)
remove(exp)
#
# remove all the children of the expression
@@ -345,18 +371,22 @@
#raise "cannot reply to missing #{fei.to_debug_s}"
lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" }
return
end
+ onotify :reply, fei, workitem
+
exp.reply(workitem)
end
#
# Adds or updates a flow expression in this pool
#
def update (flowExpression)
+ onotify :update, flowExpression.fei, flowExpression
+
get_expression_storage()[flowExpression.fei] = flowExpression
end
#
# Fetches a FlowExpression from the pool.
@@ -420,10 +450,12 @@
return if not exp
ldebug { "remove() fe #{fei.to_debug_s}" }
+ onotify :remove, fei
+
synchronize do
@monitors.delete(fei)
#get_expression_storage().remove(fei, workitem)
@@ -460,10 +492,12 @@
get_expression_storage.each_of_kind(Schedulable) do |fe|
ldebug { "reschedule() for #{fe.fei.to_debug_s}..." }
+ onotify :reschedule, fe.fei
+
fe.reschedule(get_scheduler)
end
ldebug { "reschedule() done." }
end
@@ -476,11 +510,11 @@
#
def engine_environment_id ()
synchronize do
return @eei if @eei
@eei = FlowExpressionId.new
- @eei.owfe_version = OPENWFE_VERSION
+ @eei.owfe_version = OPENWFERU_VERSION
@eei.engine_id = get_engine.service_name
@eei.initial_engine_id = @eei.engine_id
@eei.workflow_definition_url = 'ee'
@eei.workflow_definition_name = 'ee'
@eei.workflow_definition_revision = '0'
@@ -515,12 +549,17 @@
def evaluate_definition (raw_definition, workitem)
expression = raw_definition.instantiate(workitem)
end
def remove_environment (environment_id)
+
+ ldebug { "remove_environment() #{environment_id.to_debug_s}" }
+
env, fei = fetch(environment_id)
+
env.unbind()
+
get_expression_storage().delete(environment_id)
end
def build_workitem (launchitem)
@@ -584,10 +623,10 @@
def new_fei (flow_url, flow_name, flow_revision, exp_name)
fei = FlowExpressionId.new
- fei.owfe_version = OPENWFE_VERSION
+ fei.owfe_version = OPENWFERU_VERSION
fei.engine_id = get_engine.service_name
fei.initial_engine_id = fei.engine_id
fei.workflow_definition_url = flow_url
fei.workflow_definition_name = flow_name
fei.workflow_definition_revision = flow_revision