lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.4 vs lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.5

- old
+ new

@@ -39,20 +39,98 @@ # John Mettraux at openwfe.org # require 'openwfe/utils' require 'openwfe/rudefinitions' +require 'openwfe/expressions/condition' require 'openwfe/expressions/flowexpression' # # base expressions like 'sequence' and 'concurrence' # module OpenWFE + # + # The concurrence expression will execute each of its (direct) children + # in parallel threads. + # + # Thus, + # + # <concurrence> + # <participant ref="pa" /> + # <participant ref="pb" /> + # </concurrence> + # + # Participants pa and pb will be 'treated' in parallel (quasi + # simultaneously). + # + # The concurrence expressions accept many attributes, that can get + # combined. By default, the concurrence waits for all its children to + # reply and returns the workitem of the first child that replied. + # The attributes tune this behaviour. + # + # <em>count</em> + # + # <concurrence count="1"> + # <participant ref="pa" /> + # <participant ref="pb" /> + # </concurrence> + # + # The concurrence will be over as soon as 'pa' or 'pb' replied, i.e. + # as soon as "1" child replied. + # + # <em>remaining</em> + # + # The attribute 'remaining' can take two values 'cancel' (the default) and + # 'forget'. + # Cancelled children are completely wiped away, forgotten ones continue + # to operate but their reply will simply get discarded. + # + # <em>over-if</em> + # + # 'over-if' accepts a 'boolean expression' (something replying 'true' or + # 'false'), if the expression evaluates to true, the concurrence will be + # over and the remaining children will get cancelled (the default) or + # forgotten. + # + # <em>merge</em> + # + # By default, the first child to reply to its parent 'concurrence' + # expression 'wins', i.e. its workitem is used for resuming the flow (after + # the concurrence). + # + # [first] The default : the first child to reply wins + # [last] The last child to reply wins + # [highest] The first 'defined' child (in the list of children) will win + # [lowest] The last 'defined' child (in the list of children) will win + # + # Thus, in that example + # + # <concurrence merge="lowest"> + # <participant ref="pa" /> + # <participant ref="pb" /> + # </concurrence> + # + # when the concurrence is done, the workitem of 'pb' is used to resume the + # flow after the concurrence. + # + # <em>merge-type</em> + # + # [override] The default : no mix of values between the workitems do occur + # [mix] Priority is given to the 'winning' workitem but their values + # get mixed + # + # The merge occurs are the top level of workitem attributes. + # + # More complex merge behaviour can be obtained by extending the + # GenericSyncExpression class. But the default sync options are already + # numerous and powerful. + # class ConcurrenceExpression < SequenceExpression + include ConditionMixin attr_accessor \ :sync_expression def apply (workitem) @@ -71,10 +149,11 @@ concurrence = self @children.each do |child| Thread.new do begin + #ldebug { "apply() child : #{child.to_debug_s}" } concurrence.synchronize do get_expression_pool().apply(child, workitem.dup) end rescue Exception => e lwarn do @@ -139,10 +218,15 @@ @reply_count = 0 @count = determine_count(synchable, workitem) @cancel_remaining = cancel_remaining?(synchable, workitem) + merge = synchable.lookup_attribute(:merge, workitem, :first) + merge_type = synchable.lookup_attribute(:merge_type, workitem, :mix) + + @merge_array = MergeArray.new(merge, merge_type) + @unready_queue = [] end # # when all the children got applied concurrently, the concurrence @@ -160,11 +244,15 @@ queue = @unready_queue @unready_queue = nil synchable.store_itself() queue.each do |workitem| - do_reply(synchable, workitem) + break if do_reply(synchable, workitem) + # + # do_reply() will return 'true' as soon as the + # concurrence is over, if this is the case, the + # queue should not be treated anymore end end end def add_child (child) @@ -198,33 +286,58 @@ synchable.ldebug do "#{self.class}.do_reply() "+ "#{workitem.last_expression_id.to_debug_s}" end + @merge_array.push(synchable, workitem) + @reply_count = @reply_count + 1 @remaining_children.delete(workitem.last_expression_id) if @remaining_children.length <= 0 - synchable.reply_to_parent(workitem) - return + reply_to_parent(synchable) + return true end if @count > 0 and @reply_count >= @count treat_remaining_children(synchable) - synchable.reply_to_parent(workitem) - return + reply_to_parent(synchable) + return true end + # + # over-if + + conditional = synchable.eval_condition("over-if", workitem) + + if conditional + treat_remaining_children(synchable) + reply_to_parent(synchable) + return true + end + + # + # not over, resuming + synchable.store_itself() #synchable.ldebug do # "#{self.class}.do_reply() not replying to parent "+ # "#{workitem.last_expression_id.to_debug_s}" #end + + return false end + def reply_to_parent (synchable) + + workitem = @merge_array.do_merge + + synchable.reply_to_parent(workitem) + end + def treat_remaining_children (synchable) expool = synchable.get_expression_pool @remaining_children.each do |child| @@ -258,23 +371,146 @@ return -1 if not s i = s.to_i return -1 if i < 1 return i end - end - # - # Merges a workitem (source) into another (target). - # If inPlace is left to false, a brand new workitem is returned, - # else the merge occurs directly into the target workitem. - # - def merge (wiTarget, wiSource, inPlace=false) + # + # This inner class is used to gather workitems before the final + # merge, which is triggered by calling the do_merge() method + # which returns the merged workitem. + # + class MergeArray - wiTarget = wiTarget.dup if not inPlace + attr_accessor \ + :workitem, + :workitems_by_arrival, + :workitems_by_altitude, + :merge, + :merge_type - wiSource.attributes.each do | k, v | - wiTarget.attributes[k.dup] = v.dup - end + def initialize (merge, merge_type) + + @merge = merge.downcase + @merge_type = merge_type.downcase + + ensure_merge_settings() + + @workitem = nil + + if highest? or lowest? + @workitems_by_arrival = [] + @workitems_by_altitude = [] + end + end + + def push (synchable, wi) + + if not @workitems_by_arrival + # + # last or first + # + source, target = if first? + [ @workitem, wi ] + else + [ wi, @workitem ] + end + @workitem = merge(target, source) + + return + end + + index = synchable.children.index( + wi.last_expression_id) + + @workitems_by_arrival << wi + @workitems_by_altitude[index] = wi + end + + # + # merges the workitems stored here + # + def do_merge + + return @workitem if @workitem + + list = if first? + @workitems_by_arrival.reverse + elsif last? + @workitems_by_arrival + elsif highest? + @workitems_by_altitude.reverse + elsif lowest? + @workitems_by_altitude + end + + result = nil + + list.each do |wi| + next unless wi + result = merge(result, wi) + end + + #puts "___ result :" + #puts result.to_s + #puts + + return result + end + + protected + + def first? + @merge == "first" + end + def last? + @merge == "last" + end + def highest? + @merge == "highest" + end + def lowest? + @merge == "lowest" + end + + def mix? + @merge_type == "mix" + end + def override? + @merge_type == "override" + end + + # + # Making sure @merge and @merge_type are set to + # appropriate values. + # + def ensure_merge_settings + + @merge_type = "mix" unless override? + @merge = "first" unless last? or highest? or lowest? + end + + def merge (wiTarget, wiSource) + + return wiSource unless wiTarget + return wiTarget unless wiSource + + return wiSource if override? + + wiSource.attributes.each do | k, v | + + #puts "merge() '#{k}' => '#{v}'" + + nk = OpenWFE::fulldup(k) + nv = OpenWFE::fulldup(v) + + wiTarget.attributes[nk] = nv + end + + return wiTarget + end + end + end end