lib/ruote/exp/flowexpression.rb in ruote-2.1.4 vs lib/ruote/exp/flowexpression.rb in ruote-2.1.5
- old
+ new
@@ -143,20 +143,35 @@
# apply/reply
#++
def self.do_action (context, msg)
+ fei = msg['fei']
+ action = msg['action']
+
+ if action == 'reply' && fei['engine_id'] != context.engine_id
+
+ ep = context.plist.lookup(fei['engine_id'])
+
+ raise(
+ "no EngineParticipant found under name '#{fei['engine_id']}'"
+ ) unless ep
+
+ ep.reply(fei, msg['workitem'])
+ return
+ end
+
fexp = nil
3.times do
fexp = fetch(context, msg['fei'])
break if fexp
sleep 0.028
end
# this retry system is only useful with ruote-couch
- fexp.send("do_#{msg['action']}", msg) if fexp
+ fexp.send("do_#{action}", msg) if fexp
end
def do_apply
if not Condition.apply?(attribute(:if), attribute(:unless))
@@ -182,19 +197,10 @@
apply
end
def reply_to_parent (workitem, delete=true)
- #if delete && h.state.nil?
- # p @msg
- # if @msg && @msg['action'] == 'reply'
- # do_unpersist || return
- # else
- # unpersist_or_raise
- # end
- #end
-
if h.tagname
unset_variable(h.tagname)
@context.storage.put_msg(
@@ -289,20 +295,20 @@
# The raw handling of messages passed to expressions (the fine handling
# is done in the #cancel method).
#
def do_cancel (msg)
- return if h.state == 'cancelling'
- # cancel on cancel gets discarded
-
- @msg = Ruote.fulldup(msg)
-
flavour = msg['flavour']
+ return if h.state == 'cancelling' && flavour != 'kill'
+ # cancel on cancel gets discarded
+
return if h.state == 'failed' && flavour == 'timeout'
# do not timeout expressions that are "in error" (failed)
+ @msg = Ruote.fulldup(msg)
+
h.state = case flavour
when 'kill' then 'dying'
when 'timeout' then 'timing_out'
else 'cancelling'
end
@@ -488,12 +494,50 @@
def tree_children
tree[2]
end
+ # Generates a sub_wfid, without hitting storage.
+ #
+ # There's a better implementation for sure...
+ #
+ def get_next_sub_wfid
+
+ i = [
+ $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect
+ ].join('-').hash
+
+ (i < 0 ? "1#{i * -1}" : "0#{i}").to_s
+ end
+
protected
+ def to_dot (opts)
+
+ i = fei()
+
+ label = "#{[ i.wfid, i.sub_wfid, i.expid].join(" ")} #{tree.first}"
+ label += " (#{h.state})" if h.state
+
+ a = []
+ a << "\"#{i.to_storage_id}\" [ label=\"#{label}\" ];"
+
+ # parent
+
+ if h.parent_id
+ a << "\"#{i.to_storage_id}\" -> \"#{parent_id.to_storage_id}\";"
+ end
+
+ # children
+
+ h.children.each do |cfei|
+ a << "\"#{i.to_storage_id}\" -> \"#{Ruote.to_storage_id(cfei)}\";"
+ end
+
+ a
+ end
+
def pre_apply_child (child_index, workitem, forget)
child_fei = h.fei.merge('expid' => "#{h.fei['expid']}_#{child_index}")
h.children << child_fei unless forget
@@ -515,22 +559,9 @@
msg = pre_apply_child(child_index, workitem, forget)
persist_or_raise unless forget
@context.storage.put_msg('apply', msg)
- end
-
- # Generates a sub_wfid, without hitting storage.
- #
- # There's a better implementation for sure...
- #
- def get_next_sub_wfid
-
- i = [
- $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect
- ].join('-').hash
-
- (i < 0 ? "1#{i * -1}" : "0#{i}").to_s
end
def register_child (fei)
h.children << fei