lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.4 vs lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.5
- old
+ new
@@ -39,20 +39,98 @@
# John Mettraux at openwfe.org
#
require 'openwfe/utils'
require 'openwfe/rudefinitions'
+require 'openwfe/expressions/condition'
require 'openwfe/expressions/flowexpression'
#
# base expressions like 'sequence' and 'concurrence'
#
module OpenWFE
+ #
+ # The concurrence expression will execute each of its (direct) children
+ # in parallel threads.
+ #
+ # Thus,
+ #
+ # <concurrence>
+ # <participant ref="pa" />
+ # <participant ref="pb" />
+ # </concurrence>
+ #
+ # Participants pa and pb will be 'treated' in parallel (quasi
+ # simultaneously).
+ #
+ # The concurrence expressions accept many attributes, that can get
+ # combined. By default, the concurrence waits for all its children to
+ # reply and returns the workitem of the first child that replied.
+ # The attributes tune this behaviour.
+ #
+ # <em>count</em>
+ #
+ # <concurrence count="1">
+ # <participant ref="pa" />
+ # <participant ref="pb" />
+ # </concurrence>
+ #
+ # The concurrence will be over as soon as 'pa' or 'pb' replied, i.e.
+ # as soon as "1" child replied.
+ #
+ # <em>remaining</em>
+ #
+ # The attribute 'remaining' can take two values 'cancel' (the default) and
+ # 'forget'.
+ # Cancelled children are completely wiped away, forgotten ones continue
+ # to operate but their reply will simply get discarded.
+ #
+ # <em>over-if</em>
+ #
+ # 'over-if' accepts a 'boolean expression' (something replying 'true' or
+ # 'false'), if the expression evaluates to true, the concurrence will be
+ # over and the remaining children will get cancelled (the default) or
+ # forgotten.
+ #
+ # <em>merge</em>
+ #
+ # By default, the first child to reply to its parent 'concurrence'
+ # expression 'wins', i.e. its workitem is used for resuming the flow (after
+ # the concurrence).
+ #
+ # [first] The default : the first child to reply wins
+ # [last] The last child to reply wins
+ # [highest] The first 'defined' child (in the list of children) will win
+ # [lowest] The last 'defined' child (in the list of children) will win
+ #
+ # Thus, in that example
+ #
+ # <concurrence merge="lowest">
+ # <participant ref="pa" />
+ # <participant ref="pb" />
+ # </concurrence>
+ #
+ # when the concurrence is done, the workitem of 'pb' is used to resume the
+ # flow after the concurrence.
+ #
+ # <em>merge-type</em>
+ #
+ # [override] The default : no mix of values between the workitems do occur
+ # [mix] Priority is given to the 'winning' workitem but their values
+ # get mixed
+ #
+ # The merge occurs are the top level of workitem attributes.
+ #
+ # More complex merge behaviour can be obtained by extending the
+ # GenericSyncExpression class. But the default sync options are already
+ # numerous and powerful.
+ #
class ConcurrenceExpression < SequenceExpression
+ include ConditionMixin
attr_accessor \
:sync_expression
def apply (workitem)
@@ -71,10 +149,11 @@
concurrence = self
@children.each do |child|
Thread.new do
begin
+ #ldebug { "apply() child : #{child.to_debug_s}" }
concurrence.synchronize do
get_expression_pool().apply(child, workitem.dup)
end
rescue Exception => e
lwarn do
@@ -139,10 +218,15 @@
@reply_count = 0
@count = determine_count(synchable, workitem)
@cancel_remaining = cancel_remaining?(synchable, workitem)
+ merge = synchable.lookup_attribute(:merge, workitem, :first)
+ merge_type = synchable.lookup_attribute(:merge_type, workitem, :mix)
+
+ @merge_array = MergeArray.new(merge, merge_type)
+
@unready_queue = []
end
#
# when all the children got applied concurrently, the concurrence
@@ -160,11 +244,15 @@
queue = @unready_queue
@unready_queue = nil
synchable.store_itself()
queue.each do |workitem|
- do_reply(synchable, workitem)
+ break if do_reply(synchable, workitem)
+ #
+ # do_reply() will return 'true' as soon as the
+ # concurrence is over, if this is the case, the
+ # queue should not be treated anymore
end
end
end
def add_child (child)
@@ -198,33 +286,58 @@
synchable.ldebug do
"#{self.class}.do_reply() "+
"#{workitem.last_expression_id.to_debug_s}"
end
+ @merge_array.push(synchable, workitem)
+
@reply_count = @reply_count + 1
@remaining_children.delete(workitem.last_expression_id)
if @remaining_children.length <= 0
- synchable.reply_to_parent(workitem)
- return
+ reply_to_parent(synchable)
+ return true
end
if @count > 0 and @reply_count >= @count
treat_remaining_children(synchable)
- synchable.reply_to_parent(workitem)
- return
+ reply_to_parent(synchable)
+ return true
end
+ #
+ # over-if
+
+ conditional = synchable.eval_condition("over-if", workitem)
+
+ if conditional
+ treat_remaining_children(synchable)
+ reply_to_parent(synchable)
+ return true
+ end
+
+ #
+ # not over, resuming
+
synchable.store_itself()
#synchable.ldebug do
# "#{self.class}.do_reply() not replying to parent "+
# "#{workitem.last_expression_id.to_debug_s}"
#end
+
+ return false
end
+ def reply_to_parent (synchable)
+
+ workitem = @merge_array.do_merge
+
+ synchable.reply_to_parent(workitem)
+ end
+
def treat_remaining_children (synchable)
expool = synchable.get_expression_pool
@remaining_children.each do |child|
@@ -258,23 +371,146 @@
return -1 if not s
i = s.to_i
return -1 if i < 1
return i
end
- end
- #
- # Merges a workitem (source) into another (target).
- # If inPlace is left to false, a brand new workitem is returned,
- # else the merge occurs directly into the target workitem.
- #
- def merge (wiTarget, wiSource, inPlace=false)
+ #
+ # This inner class is used to gather workitems before the final
+ # merge, which is triggered by calling the do_merge() method
+ # which returns the merged workitem.
+ #
+ class MergeArray
- wiTarget = wiTarget.dup if not inPlace
+ attr_accessor \
+ :workitem,
+ :workitems_by_arrival,
+ :workitems_by_altitude,
+ :merge,
+ :merge_type
- wiSource.attributes.each do | k, v |
- wiTarget.attributes[k.dup] = v.dup
- end
+ def initialize (merge, merge_type)
+
+ @merge = merge.downcase
+ @merge_type = merge_type.downcase
+
+ ensure_merge_settings()
+
+ @workitem = nil
+
+ if highest? or lowest?
+ @workitems_by_arrival = []
+ @workitems_by_altitude = []
+ end
+ end
+
+ def push (synchable, wi)
+
+ if not @workitems_by_arrival
+ #
+ # last or first
+ #
+ source, target = if first?
+ [ @workitem, wi ]
+ else
+ [ wi, @workitem ]
+ end
+ @workitem = merge(target, source)
+
+ return
+ end
+
+ index = synchable.children.index(
+ wi.last_expression_id)
+
+ @workitems_by_arrival << wi
+ @workitems_by_altitude[index] = wi
+ end
+
+ #
+ # merges the workitems stored here
+ #
+ def do_merge
+
+ return @workitem if @workitem
+
+ list = if first?
+ @workitems_by_arrival.reverse
+ elsif last?
+ @workitems_by_arrival
+ elsif highest?
+ @workitems_by_altitude.reverse
+ elsif lowest?
+ @workitems_by_altitude
+ end
+
+ result = nil
+
+ list.each do |wi|
+ next unless wi
+ result = merge(result, wi)
+ end
+
+ #puts "___ result :"
+ #puts result.to_s
+ #puts
+
+ return result
+ end
+
+ protected
+
+ def first?
+ @merge == "first"
+ end
+ def last?
+ @merge == "last"
+ end
+ def highest?
+ @merge == "highest"
+ end
+ def lowest?
+ @merge == "lowest"
+ end
+
+ def mix?
+ @merge_type == "mix"
+ end
+ def override?
+ @merge_type == "override"
+ end
+
+ #
+ # Making sure @merge and @merge_type are set to
+ # appropriate values.
+ #
+ def ensure_merge_settings
+
+ @merge_type = "mix" unless override?
+ @merge = "first" unless last? or highest? or lowest?
+ end
+
+ def merge (wiTarget, wiSource)
+
+ return wiSource unless wiTarget
+ return wiTarget unless wiSource
+
+ return wiSource if override?
+
+ wiSource.attributes.each do | k, v |
+
+ #puts "merge() '#{k}' => '#{v}'"
+
+ nk = OpenWFE::fulldup(k)
+ nv = OpenWFE::fulldup(v)
+
+ wiTarget.attributes[nk] = nv
+ end
+
+ return wiTarget
+ end
+ end
+
end
end