lib/ruote/exp/flow_expression.rb in ruote-2.3.0.1 vs lib/ruote/exp/flow_expression.rb in ruote-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -172,10 +172,18 @@ h.parent_id ? Ruote::FlowExpressionId.new(h.parent_id) : nil end + # Returns the child_id for this expression. (The rightmost part of the + # fei.expid). + # + def child_id + + fei.child_id + end + # Fetches the parent expression, or returns nil if there is no parent # expression. # def parent @@ -225,10 +233,18 @@ def root_id(stubborn=false) root(stubborn).fei end + # Concurrent expressions (expressions that apply more than one child + # at a time) are supposed to return true here. + # + def is_concurrent? + + false + end + # Turns this FlowExpression instance into a Hash (well, just hands back # the base hash behind it). # def to_h @@ -240,10 +256,29 @@ def applied_workitem @awi ||= Ruote::Workitem.new(h.applied_workitem) end + # Given an index, returns the child fei (among the currently registered + # children feis) whose fei.expid ends with this index (whose child_id + # is equal to that index). + # + # Returns nil if not found or a child fei as a Hash. + # + def cfei_at(i) + + children.find { |cfei| Ruote.extract_child_id(cfei) == i } + end + + # Returns the list of child_ids (last part of the fei.expid) for the + # currently registered (active) children. + # + def child_ids + + children.collect { |cfei| Ruote.extract_child_id(cfei) } + end + # Instantiates expression back from hash. # def self.from_h(context, h) return self.new(nil, h) unless context @@ -334,10 +369,20 @@ # Called by the worker when it has just created this FlowExpression and # wants to apply it. # def do_apply(msg) + if msg['state'] == 'paused' + + return pause_on_apply(msg) + end + + if msg['flavour'].nil? && (aw = attribute(:await)) + + return await(aw, msg) + end + unless Condition.apply?(attribute(:if), attribute(:unless)) return do_reply_to_parent(h.applied_workitem) end @@ -383,10 +428,56 @@ consider_timers apply end + # Called by #do_apply when msg['state'] == 'paused'. Covers the + # "apply/launch it but it's immediately paused" case. Freezes the + # apply message in h.paused_apply and saves the expression. + # + def pause_on_apply(msg) + + msg['state'] = nil + + h.state = 'paused' + h.paused_apply = msg + + persist_or_raise + end + + # If the expression has an :await attribute, the expression gets + # into a special "awaiting" state until the condition in the value + # of :await gets triggered and the trigger calls resume on the + # expression. + # + def await(att, msg) + + action, condition = + Ruote::Exp::AwaitExpression.extract_await_ac(:await => att) + + raise ::ArgumentError.new( + ":await does not understand #{att.inspect}" + ) if action == nil + + msg.merge!('flavour' => 'awaiting') + + h.state = 'awaiting' + h.paused_apply = msg + + persist_or_raise + + @context.tracker.add_tracker( + h.fei['wfid'], + action, + Ruote.to_storage_id(h.fei), + condition, + { '_auto_remove' => true, + 'action' => 'resume', + 'fei' => h.fei, + 'flavour' => 'awaiting' }) + end + # FlowExpression call this method when they're done and they want their # parent expression to take over (it will end up calling the #reply of # the parent expression). # # Expression implementations are free to override this method. @@ -454,12 +545,18 @@ elsif h.state == nil && h.on_reply trigger('on_reply', workitem) - elsif (h.lost || h.flanking) && h.state.nil? + elsif h.flanking && h.state.nil? # + # do vanish + + do_unpersist + + elsif h.lost && h.state.nil? + # # do not reply, sit here (and wait for cancellation probably) do_persist elsif h.trigger && workitem['fields']["__#{h.trigger}__"] @@ -481,11 +578,11 @@ leave_tag(workitem) if h.tagname (do_unpersist || return) if delete # remove expression from storage - if h.parent_id + if h.parent_id && ! h.attached @context.storage.put_msg( 'reply', 'fei' => h.parent_id, 'workitem' => workitem.merge!('fei' => h.fei), @@ -493,19 +590,22 @@ 'flavour' => flavour) else @context.storage.put_msg( - h.forgotten ? 'ceased' : 'terminated', + (h.forgotten || h.attached) ? 'ceased' : 'terminated', 'wfid' => h.fei['wfid'], 'fei' => h.fei, 'workitem' => workitem, 'variables' => h.variables, 'flavour' => flavour) - if h.state.nil? && h.on_terminate == 'regenerate' && ! h.forgotten - + if + h.state.nil? && + h.on_terminate == 'regenerate' && + ! (h.forgotten || h.attached) + then @context.storage.put_msg( 'regenerate', 'wfid' => h.fei['wfid'], 'tree' => h.original_tree, 'workitem' => workitem, @@ -715,13 +815,18 @@ # Will "unpause" the expression (if it was paused), and trigger any # 'paused_replies' (replies that came while the expression was paused). # def do_resume(msg) - return if h.state != 'paused' + return unless h.state == 'paused' || h.state == 'awaiting' h['state'] = nil + + m = h.delete('paused_apply') + return do_apply(m) if m + # if it's a paused apply, pipe it directly to #do_apply + replies = h.delete('paused_replies') || [] do_persist || return h.children.each { |i| @context.storage.put_msg('resume', 'fei' => i) } @@ -742,10 +847,14 @@ i = h.fei.merge( 'subid' => Ruote.generate_subid(h.fei.inspect), 'expid' => pos) + if ci = opts[:child_id] + i['subid'] = "#{i['subid']}k#{ci}" + end + #p '=== launch_sub ===' #p [ :launcher, h.fei['expid'], h.fei['subid'], h.fei['wfid'] ] #p [ :launched, i['expid'], i['subid'], i['wfid'] ] forget = opts[:forget] @@ -914,10 +1023,10 @@ # def consider_tag tag = attribute(:tag) - return unless tag + return unless tag && tag.strip.size > 0 h.tagname = tag h.full_tagname = applied_workitem.tags.join('/') return if h.trigger