lib/openwfe/expool/expressionpool.rb in openwferu-0.9.2 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.3
- old
+ new
@@ -47,10 +47,11 @@
require 'openwfe/utils'
require 'openwfe/service'
require 'openwfe/logging'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
+require 'openwfe/util/stoppable'
require 'openwfe/util/lru_cache'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw_xml'
include OpenWFE
@@ -118,23 +119,37 @@
# The ExpressionPool stores expressions (pieces of workflow instance).
# It's the core of the workflow engine.
# It relies on an expression storage for actual persistence of the
# expressions.
#
- class ExpressionPool < Service
- include MonitorMixin, OwfeServiceLocator
+ class ExpressionPool
+ include ServiceMixin, MonitorMixin, OwfeServiceLocator, Stoppable
@@last_given_instance_id = -1
#
# storing at class level the last workflow instance id given
- def initialize (serviceName, applicationContext)
- super(serviceName, applicationContext)
+ def initialize (service_name, application_context)
+
+ super()
+
+ service_init(service_name, application_context)
+
@monitors = MonitorProvider.new(application_context)
+
+ reschedule_a_bit_later
end
#
+ # Makes sure to call the do_stop() method of the Stoppable mixin
+ #
+ def stop
+ # would an alias be better ?
+ do_stop
+ end
+
+ #
# Obtains a unique monitor for an expression.
# It avoids the need for the FlowExpression instances to include
# the monitor mixin by themselves
#
def get_monitor (fei)
@@ -149,21 +164,32 @@
rawExpression = buildRawExpression(launchitem)
wi = build_workitem(launchitem)
rawExpression.apply(wi)
+
+ return wi.flow_expression_id
end
#
# launches a subprocess
#
def launch_template \
- (requesting_expression, template_fei, workitem, params=nil)
+ (requesting_expression, template, workitem, params=nil)
- ldebug { "launch() request for #{template_fei.to_debug_s}" }
+ #ldebug { "launch_template() of class #{template.class}" }
- rawexp, fei = fetch(template_fei)
+ rawexp = nil
+
+ if template.kind_of? FlowExpressionId
+ rawexp, fei = fetch(template)
+ else # template is of kind RawExpression
+ rawexp = template
+ end
+
+ ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" }
+
rawexp = rawexp.dup()
rawexp.fei = rawexp.fei.dup()
if requesting_expression.kind_of? FlowExpressionId
rawexp.parent_id = requesting_expression
@@ -176,10 +202,18 @@
else # kind is FlowExpression
rawexp.parent_id = requesting_expression.fei
rawexp.fei.workflow_instance_id = \
"#{requesting_expression.fei.workflow_instance_id}.0"
end
+
+ #ldebug do
+ # p = ""
+ # p = rawexp.parent_id.to_debug_s if rawexp.parent_id
+ # "launch_template()\n"+
+ # " rawexp.fei is #{rawexp.fei.to_debug_s}\n"+
+ # " rawexp.parent_id is #{p}"
+ #end
#ldebug do
# "launch_template() spawning wfid " +
# "#{rawexp.fei.workflow_instance_id.to_s}"
#end
@@ -191,10 +225,12 @@
# the new scope gets its own environment
rawexp.store_itself()
rawexp.apply(workitem)
+
+ return workitem.flow_expression_id
end
#
# Evaluates a raw definition expression and
# returns its body fei
@@ -268,12 +304,18 @@
exp, fei = fetch(exp)
workitem.last_expression_id = fei
- remove(exp, workitem)
+ #remove(exp, workitem)
+ remove(exp)
+ #
+ # remove all the children of the expression
+
+ exp.clean_children()
+
if not exp.parent_id
ldebug do
"reply_to_parent() process " +
"#{exp.fei.workflow_instance_id} terminated"
end
@@ -323,18 +365,22 @@
# The param 'exp' may be a FlowExpressionId or a FlowExpression that
# has to be reloaded.
#
def fetch (exp)
synchronize do
+
fei = exp
if exp.kind_of? FlowExpression
fei = exp.fei
elsif not exp.kind_of? FlowExpressionId
raise \
"Cannot fetch expression with key : "+
"'#{fei}' (#{fei.class})"
end
+
+ ldebug { "fetch() for #{fei.to_debug_s}" }
+
return get_expression_storage()[fei], fei
end
end
#
@@ -358,21 +404,19 @@
ee = Environment\
.new(eei, nil, nil, @application_context, nil)
ee.store_itself()
end
- ldebug { "fetch_engine_environment() stored new ee" }
-
return ee
end
end
#
# Removes a flow expression from the pool
# (This method is mainly called from the pool itself)
#
- def remove (exp, workitem=nil)
+ def remove (exp)
exp, fei = fetch(exp)
return if not exp
@@ -380,20 +424,58 @@
synchronize do
@monitors.delete(fei)
- #get_expression_storage().delete(fei)
- get_expression_storage().remove(fei, workitem)
+ #get_expression_storage().remove(fei, workitem)
+ get_expression_storage().delete(fei)
if exp.owns_its_environment?
remove_environment(exp.environment_id)
end
end
end
+ #
+ # This method is called at each expool (engine) [re]start.
+ # It roams through the previously saved (persisted) expressions
+ # to reschedule ones like 'sleep' or 'cron'.
+ #
+ def reschedule
+ synchronize do
+
+ #if is_stopped?
+ # linfo { "reschedule() skipped as expool is stopped" }
+ # return
+ #end
+ #if get_scheduler.is_stopped?
+ # linfo do
+ # "reschedule() skipped as scheduler "+
+ # "#{get_scheduler.object_id} is stopped"
+ # end
+ # return
+ #end
+
+ ldebug { "reschedule() initiating..." }
+
+ get_expression_storage.each_of_kind(Schedulable) do |fe|
+
+ ldebug { "reschedule() for #{fe.fei.to_debug_s}..." }
+
+ fe.reschedule(get_scheduler)
+ end
+
+ ldebug { "reschedule() done." }
+ end
+ end
+
+ #
+ # Returns the unique engine_environment FlowExpressionId instance.
+ # There is only one such environment in an engine, hence this
+ # 'singleton' method.
+ #
def engine_environment_id ()
synchronize do
return @eei if @eei
@eei = FlowExpressionId.new
@eei.owfe_version = OPENWFE_VERSION
@@ -409,18 +491,37 @@
end
end
protected
+ def reschedule_a_bit_later
+ Thread.new do
+ #
+ # Just leaving some time for the initialize() to finish
+ # and let the expression pool get registered in
+ # the application context
+ #
+ begin
+ sleep(0.001)
+ reschedule()
+ rescue
+ lwarn do
+ "reschedule() failed\n"+
+ OpenWFE::exception_to_s($!)
+ end
+ end
+ end
+ end
+
def evaluate_definition (raw_definition, workitem)
expression = raw_definition.instantiate(workitem)
end
def remove_environment (environment_id)
env, fei = fetch(environment_id)
env.unbind()
- get_expression_storage().remove(environment_id, nil)
+ get_expression_storage().delete(environment_id)
end
def build_workitem (launchitem)
wi = InFlowWorkItem.new()
@@ -496,9 +597,10 @@
fei.expression_name = exp_name
return fei
end
def new_workflow_instance_id ()
+
synchronize do
wfid = OpenWFE::current_time_millis()
wfid = wfid + 1 if wfid == @@last_given_instance_id
@@last_given_instance_id = wfid
return wfid.to_s