lib/openwfe/expool/expressionpool.rb in openwferu-0.9.15 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.16
- old
+ new
@@ -188,11 +188,11 @@
# before the actual launch is completely over.
#
# Returns the FlowExpressionId instance of the root expression of
# the newly launched flow.
#
- def launch (launchitem, options)
+ def launch (launchitem, options={})
#
# prepare raw expression
raw_expression = prepare_raw_expression launchitem
@@ -205,11 +205,11 @@
# as this expression is the root of a new process instance,
# it has to have an environment for all the variables of
# the process instance
raw_expression = wrap_in_schedule(raw_expression, options) \
- if options and options.size > 0
+ if options.size > 0
fei = raw_expression.fei
#
# apply prepared raw expression
@@ -232,22 +232,28 @@
#
def prepare_from_template (
requesting_expression, sub_id, template, params=nil)
rawexp = if template.is_a?(RawExpression)
+
+ template.application_context = @application_context
template
+
elsif template.is_a?(FlowExpressionId)
- fetch_expression(template)
+
+ fetch_expression template
+
else
- build_raw_expression(nil, template)
+
+ build_raw_expression nil, template
end
#raise "did not find subprocess in : #{template.to_s}" \
# unless rawexp
- rawexp = rawexp.dup()
- rawexp.fei = rawexp.fei.dup()
+ rawexp = rawexp.dup
+ rawexp.fei = rawexp.fei.dup
if requesting_expression == nil
rawexp.parent_id = nil
rawexp.fei.workflow_instance_id = get_wfid_generator.generate
@@ -274,15 +280,15 @@
#ldebug do
# "launch_template() spawning wfid " +
# "#{rawexp.fei.workflow_instance_id.to_s}"
#end
- env = rawexp.new_environment(params)
+ env = rawexp.new_environment params
#
# the new scope gets its own environment
- rawexp.store_itself()
+ rawexp.store_itself
rawexp
end
#
@@ -301,27 +307,27 @@
apply rawexp, workitem
rawexp.fei
end
- #
+ #--
# Evaluates a raw definition expression and
# returns its body fei
#
- def evaluate (rawExpression, workitem)
+ #def evaluate (rawExpression, workitem)
+ # #rawExpression = fetch_expression(rawExpression) \
+ # # if rawExpression.is_a?(FlowExpressionId)
+ # exp = rawExpression.instantiate_real_expression workitem
+ # fei = exp.evaluate workitem
+ # #remove(rawExpression)
+ # #
+ # # not necessary, the raw expression gets overriden by
+ # # the real expression
+ # fei
+ #end
+ #++
- exp = rawExpression.instantiate_real_expression workitem
- fei = exp.evaluate workitem
-
- #remove(rawExpression)
- #
- # not necessary, the raw expression gets overriden by
- # the real expression
-
- fei
- end
-
#
# Applies a given expression (id or expression)
#
def apply (exp, workitem)
@@ -368,13 +374,13 @@
# care of removing the cancelled expression from the parent
# expression.
#
def cancel_expression (exp)
- exp = fetch_expression(exp)
+ exp = fetch_expression exp
- wi = cancel(exp)
+ wi = cancel exp
if wi
reply_to_parent(exp, wi, false)
else
parent_exp = fetch_expression(exp.parent_id)
@@ -417,62 +423,10 @@
ldebug { "forget() forgot #{fei}" }
end
#
- # Pauses a process (sets its '/__paused__' variable to true).
- #
- def pause_process (wfid)
-
- wfid = extract_wfid(wfid)
-
- root_expression = fetch_root(wfid)
-
- root_expression.set_variable(VAR_PAUSED, true)
- end
-
- #
- # Restarts a process : removes its 'paused' flag (variable) and makes
- # sure to 'replay' events (replies) that came for it while it was
- # in pause.
- #
- def resume_process (wfid)
-
- wfid = extract_wfid(wfid)
-
- root_expression = fetch_root(wfid)
-
- #
- # remove 'paused' flag
-
- root_expression.unset_variable(VAR_PAUSED)
-
- #
- # replay
-
- journal = get_error_journal
-
- # select PausedError instances in separate list
-
- errors = journal.get_error_log(wfid)
- error_class = PausedError.name
- paused_errors = errors.select { |e| e.error_class == error_class }
-
- return if paused_errors.size < 1
-
- # remove them from current error journal
-
- journal.remove_errors wfid, paused_errors
-
- # replay select PausedError instances
-
- paused_errors.each do |e|
- journal.replay_at_error e
- end
- end
-
- #
# Replies to the parent of the given expression.
#
def reply_to_parent (exp, workitem, remove=true)
ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
@@ -536,20 +490,20 @@
#
# Adds or updates a flow expression in this pool
#
def update (flow_expression)
- #ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
+ ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
- t = Timer.new
+ #t = Timer.new
onotify :update, flow_expression.fei, flow_expression
- ldebug do
- "update() took #{t.duration} ms " +
- "#{flow_expression.fei.to_debug_s}"
- end
+ #ldebug do
+ # "update() took #{t.duration} ms " +
+ # "#{flow_expression.fei.to_debug_s}"
+ #end
flow_expression
end
#
@@ -560,20 +514,25 @@
# has to be reloaded.
#
def fetch (exp)
synchronize do
- fei = exp
-
#ldebug { "fetch() exp is of kind #{exp.class}" }
- if exp.kind_of?(FlowExpression)
- fei = exp.fei
+ fei = if exp.kind_of?(FlowExpression)
+
+ exp.fei
+
elsif not exp.kind_of?(FlowExpressionId)
+
raise \
"Cannot fetch expression with key : "+
"'#{fei}' (#{fei.class})"
+
+ else
+
+ exp
end
#ldebug { "fetch() for #{fei.to_debug_s}" }
[ get_expression_storage()[fei], fei ]
@@ -586,20 +545,22 @@
# The param 'exp' may be a FlowExpressionId or a FlowExpression that
# has to be reloaded.
#
def fetch_expression (exp)
- exp, _fei = fetch(exp)
+ exp, fei = fetch(exp)
exp
end
#
# Fetches the root expression of a process (given any of its
# expressions or its wfid).
#
def fetch_root (exp_or_wfid)
+ ldebug { "fetch_root() '#{exp_or_wfid.to_s}'" }
+
return fetch_expression_with_wfid(exp_or_wfid) \
if exp_or_wfid.is_a?(String)
exp = fetch_expression(exp_or_wfid)
@@ -636,11 +597,11 @@
# (This method is mainly called from the pool itself)
#
def remove (exp)
exp, _fei = fetch(exp) \
- if exp.kind_of?(FlowExpressionId)
+ if exp.is_a?(FlowExpressionId)
return unless exp
ldebug { "remove() fe #{exp.fei.to_debug_s}" }
@@ -668,16 +629,16 @@
t = OpenWFE::Timer.new
linfo { "reschedule() initiating..." }
- get_expression_storage.each_of_kind(Schedulable) do |fe|
+ get_expression_storage.each_of_kind(Schedulable) do |fei, fe|
- #linfo { "reschedule() for #{fe.fei.to_debug_s}..." }
- linfo { "reschedule() for #{fe.fei.to_s}..." }
+ #linfo { "reschedule() for #{fei.to_debug_s}..." }
+ linfo { "reschedule() for #{fei.to_s}..." }
- onotify :reschedule, fe.fei
+ onotify :reschedule, fei
fe.reschedule(get_scheduler)
end
linfo { "reschedule() done. (took #{t.duration} ms)" }
@@ -760,12 +721,14 @@
# collect() would look better
get_expression_storage.real_each(wfid_prefix) do |fei, fexp|
- next unless fexp.is_a? DefineExpression
+ #ldebug { "list_processes() class is #{fexp.class.name}" }
+ next unless fexp.is_a?(DefineExpression)
+
next if not consider_subprocesses and fei.wfid.index(".")
#next unless fei.wfid.match("^#{wfid_prefix}") if wfid_prefix
result << fexp
@@ -777,11 +740,19 @@
#
# Returns the first expression found with the given wfid.
#
def fetch_expression_with_wfid (wfid)
- list_processes(false, wfid)[0]
+ #list_processes(false, wfid)[0]
+
+ ps = list_processes(false, wfid)
+
+ ldebug do
+ "fetch_expression_with_wfid() '#{wfid}' found #{ps.size} items"
+ end
+
+ ps[0]
end
#
# This method is called when apply() or reply() failed for
# an expression.
@@ -849,35 +820,33 @@
#
# The real apply work.
#
def do_apply (exp, workitem)
- exp, fei = fetch(exp) if exp.kind_of?(FlowExpressionId)
+ exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId)
check_if_paused exp
- #ldebug { "apply() '#{fei}' (#{fei.class})" }
+ #ldebug { "apply() '#{_fei}'" }
if not exp
- lwarn { "apply() cannot apply missing #{fei.to_debug_s}" }
+ lwarn do
+ "do_apply() cannot apply missing #{_fei.to_debug_s}"
+ end
+
return
- #raise "apply() cannot apply missing #{fei.to_debug_s}"
+ #raise "apply() cannot apply missing #{_fei.to_debug_s}"
+ # not very helpful anyway
end
- #ldebug { "apply() #{fei.to_debug_s}" }
-
- #exp.apply_time = OpenWFE::now()
- #
- # this is done in RawExpression
-
workitem.flow_expression_id = exp.fei
onotify :apply, exp, workitem
- exp.apply(workitem)
+ exp.apply workitem
end
#
# The real reply work is done here
#
@@ -939,14 +908,15 @@
att = if oat
{ "until" => oat }
else #oin
{ "for" => oin }
end
+ att["scheduler-tags"] = "scheduled-launch"
sle = get_expression_map.get_class(:sleep)
sle = sle.new(fei.dup, fei, nil, application_context, att)
- sle.fei.expression_id = "0.1"
+ sle.fei.expression_id = "0.0"
sle.fei.expression_name = "sleep"
seq.children << sle.fei
seq.children << raw_expression.fei
seq.new_environment
@@ -966,10 +936,11 @@
{ "tab" => ocron }
else #oevery
{ "every" => oevery }
end
att["name"] = "//cron_launch__#{fei.wfid}"
+ att["scheduler-tags"] = "scheduled-launch"
cro = get_expression_map.get_class(:cron)
cro = cro.new(fei, nil, nil, application_context, att)
cro.children << raw_expression.fei
@@ -1126,10 +1097,10 @@
# The param can be a template or a definition (anything
# accepted by the determine_representation() method).
#
def build_raw_expression (launchitem, param)
- procdef = determine_representation(param)
+ procdef = determine_representation param
#return procdef if procdef.is_a? RawExpression
flow_name = procdef.attributes['name']
flow_revision = procdef.attributes['revision']