lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.8 vs lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.9
- old
+ new
@@ -39,12 +39,14 @@
# John Mettraux at openwfe.org
#
require 'openwfe/utils'
require 'openwfe/rudefinitions'
+require 'openwfe/expressions/merge'
require 'openwfe/expressions/condition'
require 'openwfe/expressions/flowexpression'
+require 'openwfe/expressions/fe_iterator'
#
# base expressions like 'sequence' and 'concurrence'
#
@@ -118,16 +120,20 @@
# <em>merge-type</em>
#
# [override] The default : no mix of values between the workitems do occur
# [mix] Priority is given to the 'winning' workitem but their values
# get mixed
+ # [isolate] the attributes of the workitem of each branch is placed
+ # in a field in the resulting workitem. For example, the
+ # attributes of the first branch will be stored under the
+ # field named '0' of the resulting workitem.
#
# The merge occurs are the top level of workitem attributes.
#
# More complex merge behaviour can be obtained by extending the
# GenericSyncExpression class. But the default sync options are already
- # numerous and powerful.
+ # numerous and powerful by their combinations.
#
class ConcurrenceExpression < SequenceExpression
include ConditionMixin
names :concurrence
@@ -137,27 +143,31 @@
def apply (workitem)
sync = lookup_attribute(:sync, workitem, :generic)
- @sync_expression = \
- get_expression_map().get_sync_class(sync).new(self, workitem)
+ @sync_expression =
+ get_expression_map.get_sync_class(sync).new(self, workitem)
@children.each do |child|
@sync_expression.add_child(child)
end
store_itself()
concurrence = self
- @children.each do |child|
+ @children.each_with_index do |child, index|
Thread.new do
begin
#ldebug { "apply() child : #{child.to_debug_s}" }
concurrence.synchronize do
- get_expression_pool().apply(child, workitem.dup)
+
+ get_expression_pool().apply(
+ child,
+ #workitem.dup)
+ get_workitem(workitem, index))
end
rescue Exception => e
lwarn do
"apply() " +
"caught exception in concurrent child " +
@@ -184,13 +194,73 @@
end
def reply (workitem)
@sync_expression.reply(self, workitem)
end
+
+ protected
+
+ def get_workitem (workitem, index)
+ workitem.dup
+ end
end
#
+ # This expression is a mix between a 'concurrence' and an 'iterator'.
+ # It understands the same attributes and behaves as an interator that
+ # forks its children concurrently.
+ #
+ # (See ConcurrenceExpression and IteratorExpression).
+ #
+ class ConcurrentIteratorExpression < ConcurrenceExpression
+
+ names :concurrent_iterator
+
+ #attr_accessor :iterator
+
+ def apply (workitem)
+
+ if @children.length < 1
+ reply_to_parent workitem
+ return
+ end
+
+ template = @children[0]
+
+ @children.clear
+
+ @workitems = []
+
+ iterator = Iterator.new(self, workitem)
+
+ while iterator.has_next?
+
+ wi = workitem.dup
+
+ @workitems << wi
+
+ vars = iterator.next self, wi
+
+ rawexp = get_expression_pool.prepare_from_template(
+ self, iterator.index, template, vars)
+
+ @children << rawexp.fei
+ end
+
+ get_expression_pool.remove(template)
+
+ super
+ end
+
+ protected
+
+ def get_workitem (workitem, index)
+ @workitems[index]
+ end
+ end
+
+ #
# A base for sync expressions, currently empty.
# That may change.
#
class SyncExpression < ObjectWithMeta
@@ -234,10 +304,12 @@
@cancel_remaining = cancel_remaining?(synchable, workitem)
merge = synchable.lookup_attribute(:merge, workitem, :first)
merge_type = synchable.lookup_attribute(:merge_type, workitem, :mix)
+ #synchable.ldebug { "new() merge_type is '#{merge_type}'" }
+
@merge_array = MergeArray.new(merge, merge_type)
@unready_queue = []
end
@@ -295,20 +367,25 @@
protected
def do_reply (synchable, workitem)
synchable.ldebug do
- "#{self.class}.do_reply() "+
+ "#{self.class}.do_reply() from"+
"#{workitem.last_expression_id.to_debug_s}"
end
@merge_array.push(synchable, workitem)
@reply_count = @reply_count + 1
@remaining_children.delete(workitem.last_expression_id)
+ #synchable.ldebug do
+ # "#{self.class}.do_reply() "+
+ # "remaining children : #{@remaining_children.length}"
+ #end
+
if @remaining_children.length <= 0
reply_to_parent(synchable)
return true
end
@@ -362,12 +439,11 @@
end
if @cancel_remaining
expool.cancel(child)
else
- #expool.remove(child)
- expool.forget(child)
+ expool.forget(synchable, child)
end
end
end
def cancel_remaining? (synchable_expression, workitem)
@@ -386,27 +462,29 @@
return -1 if i < 1
i
end
#
- # This inner class is used to gather workitems before the final
- # merge, which is triggered by calling the do_merge() method
- # which returns the merged workitem.
+ # This inner class is used to gather workitems (via push()) before
+ # the final merge
+ # This final merge is triggered by calling the do_merge() method
+ # which will return the resulting, merged workitem.
#
class MergeArray
+ include MergeMixin
attr_accessor \
:workitem,
:workitems_by_arrival,
:workitems_by_altitude,
:merge,
:merge_type
def initialize (merge, merge_type)
- @merge = merge.downcase
- @merge_type = merge_type.downcase
+ @merge = merge.strip.downcase.intern
+ @merge_type = merge_type.strip.downcase.intern
ensure_merge_settings()
@workitem = nil
@@ -416,27 +494,53 @@
end
end
def push (synchable, wi)
- if not @workitems_by_arrival
- #
- # last or first
- #
- source, target = if first?
- [ @workitem, wi ]
- else
- [ wi, @workitem ]
- end
- @workitem = merge(target, source)
+ #synchable.ldebug do
+ # "push() isolate? #{isolate?}"
+ #end
- return
+ if isolate?
+ push_in_isolation wi
+ elsif last? or first?
+ push_by_position wi
+ else
+ push_by_arrival wi
end
+ end
- index = synchable.children.index(
- wi.last_expression_id)
+ def push_by_position (wi)
+ source, target = if first?
+ [ @workitem, wi ]
+ else
+ [ wi, @workitem ]
+ end
+ @workitem = merge_workitems target, source, override?
+ end
+
+ def push_in_isolation (wi)
+
+ unless @workitem
+ @workitem = wi.dup
+ att = @workitem.attributes
+ @workitem.attributes = {}
+ end
+
+ #key = synchable.children.index wi.last_expression_id
+ key = wi.last_expression_id.child_id
+
+ @workitem.attributes[key.to_s] =
+ OpenWFE::fulldup(wi.attributes)
+ end
+
+ def push_by_arrival (wi)
+
+ #index = synchable.children.index wi.last_expression_id
+ index = Integer(wi.last_expression_id.child_id)
+
@workitems_by_arrival << wi
@workitems_by_altitude[index] = wi
end
#
@@ -458,11 +562,11 @@
result = nil
list.each do |wi|
next unless wi
- result = merge(result, wi)
+ result = merge_workitems result, wi, override?
end
#puts "___ result :"
#puts result.to_s
#puts
@@ -471,56 +575,39 @@
end
protected
def first?
- @merge == "first"
+ @merge == :first
end
def last?
- @merge == "last"
+ @merge == :last
end
def highest?
- @merge == "highest"
+ @merge == :highest
end
def lowest?
- @merge == "lowest"
+ @merge == :lowest
end
def mix?
- @merge_type == "mix"
+ @merge_type == :mix
end
def override?
- @merge_type == "override"
+ @merge_type == :override
end
+ def isolate?
+ @merge_type == :isolate
+ end
#
# Making sure @merge and @merge_type are set to
# appropriate values.
#
def ensure_merge_settings
- @merge_type = "mix" unless override?
- @merge = "first" unless last? or highest? or lowest?
- end
-
- def merge (wiTarget, wiSource)
-
- return wiSource unless wiTarget
- return wiTarget unless wiSource
-
- return wiSource if override?
-
- wiSource.attributes.each do |k, v|
-
- #puts "merge() '#{k}' => '#{v}'"
-
- nk = OpenWFE::fulldup(k)
- nv = OpenWFE::fulldup(v)
-
- wiTarget.attributes[nk] = nv
- end
-
- wiTarget
+ @merge_type = :mix unless override? or isolate?
+ @merge = :first unless last? or highest? or lowest?
end
end
end