lib/ruote/exp/flowexpression.rb in ruote-2.1.4 vs lib/ruote/exp/flowexpression.rb in ruote-2.1.5

- old
+ new

@@ -143,20 +143,35 @@ # apply/reply #++ def self.do_action (context, msg) + fei = msg['fei'] + action = msg['action'] + + if action == 'reply' && fei['engine_id'] != context.engine_id + + ep = context.plist.lookup(fei['engine_id']) + + raise( + "no EngineParticipant found under name '#{fei['engine_id']}'" + ) unless ep + + ep.reply(fei, msg['workitem']) + return + end + fexp = nil 3.times do fexp = fetch(context, msg['fei']) break if fexp sleep 0.028 end # this retry system is only useful with ruote-couch - fexp.send("do_#{msg['action']}", msg) if fexp + fexp.send("do_#{action}", msg) if fexp end def do_apply if not Condition.apply?(attribute(:if), attribute(:unless)) @@ -182,19 +197,10 @@ apply end def reply_to_parent (workitem, delete=true) - #if delete && h.state.nil? - # p @msg - # if @msg && @msg['action'] == 'reply' - # do_unpersist || return - # else - # unpersist_or_raise - # end - #end - if h.tagname unset_variable(h.tagname) @context.storage.put_msg( @@ -289,20 +295,20 @@ # The raw handling of messages passed to expressions (the fine handling # is done in the #cancel method). # def do_cancel (msg) - return if h.state == 'cancelling' - # cancel on cancel gets discarded - - @msg = Ruote.fulldup(msg) - flavour = msg['flavour'] + return if h.state == 'cancelling' && flavour != 'kill' + # cancel on cancel gets discarded + return if h.state == 'failed' && flavour == 'timeout' # do not timeout expressions that are "in error" (failed) + @msg = Ruote.fulldup(msg) + h.state = case flavour when 'kill' then 'dying' when 'timeout' then 'timing_out' else 'cancelling' end @@ -488,12 +494,50 @@ def tree_children tree[2] end + # Generates a sub_wfid, without hitting storage. + # + # There's a better implementation for sure... + # + def get_next_sub_wfid + + i = [ + $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect + ].join('-').hash + + (i < 0 ? "1#{i * -1}" : "0#{i}").to_s + end + protected + def to_dot (opts) + + i = fei() + + label = "#{[ i.wfid, i.sub_wfid, i.expid].join(" ")} #{tree.first}" + label += " (#{h.state})" if h.state + + a = [] + a << "\"#{i.to_storage_id}\" [ label=\"#{label}\" ];" + + # parent + + if h.parent_id + a << "\"#{i.to_storage_id}\" -> \"#{parent_id.to_storage_id}\";" + end + + # children + + h.children.each do |cfei| + a << "\"#{i.to_storage_id}\" -> \"#{Ruote.to_storage_id(cfei)}\";" + end + + a + end + def pre_apply_child (child_index, workitem, forget) child_fei = h.fei.merge('expid' => "#{h.fei['expid']}_#{child_index}") h.children << child_fei unless forget @@ -515,22 +559,9 @@ msg = pre_apply_child(child_index, workitem, forget) persist_or_raise unless forget @context.storage.put_msg('apply', msg) - end - - # Generates a sub_wfid, without hitting storage. - # - # There's a better implementation for sure... - # - def get_next_sub_wfid - - i = [ - $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect - ].join('-').hash - - (i < 0 ? "1#{i * -1}" : "0#{i}").to_s end def register_child (fei) h.children << fei