lib/ruote/exp/flow_expression.rb in ruote-2.3.0.1 vs lib/ruote/exp/flow_expression.rb in ruote-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -172,10 +172,18 @@
h.parent_id ?
Ruote::FlowExpressionId.new(h.parent_id) :
nil
end
+ # Returns the child_id for this expression. (The rightmost part of the
+ # fei.expid).
+ #
+ def child_id
+
+ fei.child_id
+ end
+
# Fetches the parent expression, or returns nil if there is no parent
# expression.
#
def parent
@@ -225,10 +233,18 @@
def root_id(stubborn=false)
root(stubborn).fei
end
+ # Concurrent expressions (expressions that apply more than one child
+ # at a time) are supposed to return true here.
+ #
+ def is_concurrent?
+
+ false
+ end
+
# Turns this FlowExpression instance into a Hash (well, just hands back
# the base hash behind it).
#
def to_h
@@ -240,10 +256,29 @@
def applied_workitem
@awi ||= Ruote::Workitem.new(h.applied_workitem)
end
+ # Given an index, returns the child fei (among the currently registered
+ # children feis) whose fei.expid ends with this index (whose child_id
+ # is equal to that index).
+ #
+ # Returns nil if not found or a child fei as a Hash.
+ #
+ def cfei_at(i)
+
+ children.find { |cfei| Ruote.extract_child_id(cfei) == i }
+ end
+
+ # Returns the list of child_ids (last part of the fei.expid) for the
+ # currently registered (active) children.
+ #
+ def child_ids
+
+ children.collect { |cfei| Ruote.extract_child_id(cfei) }
+ end
+
# Instantiates expression back from hash.
#
def self.from_h(context, h)
return self.new(nil, h) unless context
@@ -334,10 +369,20 @@
# Called by the worker when it has just created this FlowExpression and
# wants to apply it.
#
def do_apply(msg)
+ if msg['state'] == 'paused'
+
+ return pause_on_apply(msg)
+ end
+
+ if msg['flavour'].nil? && (aw = attribute(:await))
+
+ return await(aw, msg)
+ end
+
unless Condition.apply?(attribute(:if), attribute(:unless))
return do_reply_to_parent(h.applied_workitem)
end
@@ -383,10 +428,56 @@
consider_timers
apply
end
+ # Called by #do_apply when msg['state'] == 'paused'. Covers the
+ # "apply/launch it but it's immediately paused" case. Freezes the
+ # apply message in h.paused_apply and saves the expression.
+ #
+ def pause_on_apply(msg)
+
+ msg['state'] = nil
+
+ h.state = 'paused'
+ h.paused_apply = msg
+
+ persist_or_raise
+ end
+
+ # If the expression has an :await attribute, the expression gets
+ # into a special "awaiting" state until the condition in the value
+ # of :await gets triggered and the trigger calls resume on the
+ # expression.
+ #
+ def await(att, msg)
+
+ action, condition =
+ Ruote::Exp::AwaitExpression.extract_await_ac(:await => att)
+
+ raise ::ArgumentError.new(
+ ":await does not understand #{att.inspect}"
+ ) if action == nil
+
+ msg.merge!('flavour' => 'awaiting')
+
+ h.state = 'awaiting'
+ h.paused_apply = msg
+
+ persist_or_raise
+
+ @context.tracker.add_tracker(
+ h.fei['wfid'],
+ action,
+ Ruote.to_storage_id(h.fei),
+ condition,
+ { '_auto_remove' => true,
+ 'action' => 'resume',
+ 'fei' => h.fei,
+ 'flavour' => 'awaiting' })
+ end
+
# FlowExpression call this method when they're done and they want their
# parent expression to take over (it will end up calling the #reply of
# the parent expression).
#
# Expression implementations are free to override this method.
@@ -454,12 +545,18 @@
elsif h.state == nil && h.on_reply
trigger('on_reply', workitem)
- elsif (h.lost || h.flanking) && h.state.nil?
+ elsif h.flanking && h.state.nil?
#
+ # do vanish
+
+ do_unpersist
+
+ elsif h.lost && h.state.nil?
+ #
# do not reply, sit here (and wait for cancellation probably)
do_persist
elsif h.trigger && workitem['fields']["__#{h.trigger}__"]
@@ -481,11 +578,11 @@
leave_tag(workitem) if h.tagname
(do_unpersist || return) if delete
# remove expression from storage
- if h.parent_id
+ if h.parent_id && ! h.attached
@context.storage.put_msg(
'reply',
'fei' => h.parent_id,
'workitem' => workitem.merge!('fei' => h.fei),
@@ -493,19 +590,22 @@
'flavour' => flavour)
else
@context.storage.put_msg(
- h.forgotten ? 'ceased' : 'terminated',
+ (h.forgotten || h.attached) ? 'ceased' : 'terminated',
'wfid' => h.fei['wfid'],
'fei' => h.fei,
'workitem' => workitem,
'variables' => h.variables,
'flavour' => flavour)
- if h.state.nil? && h.on_terminate == 'regenerate' && ! h.forgotten
-
+ if
+ h.state.nil? &&
+ h.on_terminate == 'regenerate' &&
+ ! (h.forgotten || h.attached)
+ then
@context.storage.put_msg(
'regenerate',
'wfid' => h.fei['wfid'],
'tree' => h.original_tree,
'workitem' => workitem,
@@ -715,13 +815,18 @@
# Will "unpause" the expression (if it was paused), and trigger any
# 'paused_replies' (replies that came while the expression was paused).
#
def do_resume(msg)
- return if h.state != 'paused'
+ return unless h.state == 'paused' || h.state == 'awaiting'
h['state'] = nil
+
+ m = h.delete('paused_apply')
+ return do_apply(m) if m
+ # if it's a paused apply, pipe it directly to #do_apply
+
replies = h.delete('paused_replies') || []
do_persist || return
h.children.each { |i| @context.storage.put_msg('resume', 'fei' => i) }
@@ -742,10 +847,14 @@
i = h.fei.merge(
'subid' => Ruote.generate_subid(h.fei.inspect),
'expid' => pos)
+ if ci = opts[:child_id]
+ i['subid'] = "#{i['subid']}k#{ci}"
+ end
+
#p '=== launch_sub ==='
#p [ :launcher, h.fei['expid'], h.fei['subid'], h.fei['wfid'] ]
#p [ :launched, i['expid'], i['subid'], i['wfid'] ]
forget = opts[:forget]
@@ -914,10 +1023,10 @@
#
def consider_tag
tag = attribute(:tag)
- return unless tag
+ return unless tag && tag.strip.size > 0
h.tagname = tag
h.full_tagname = applied_workitem.tags.join('/')
return if h.trigger