lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.8 vs lib/openwfe/expressions/fe_concurrence.rb in openwferu-0.9.9

- old
+ new

@@ -39,12 +39,14 @@ # John Mettraux at openwfe.org # require 'openwfe/utils' require 'openwfe/rudefinitions' +require 'openwfe/expressions/merge' require 'openwfe/expressions/condition' require 'openwfe/expressions/flowexpression' +require 'openwfe/expressions/fe_iterator' # # base expressions like 'sequence' and 'concurrence' # @@ -118,16 +120,20 @@ # <em>merge-type</em> # # [override] The default : no mix of values between the workitems do occur # [mix] Priority is given to the 'winning' workitem but their values # get mixed + # [isolate] the attributes of the workitem of each branch is placed + # in a field in the resulting workitem. For example, the + # attributes of the first branch will be stored under the + # field named '0' of the resulting workitem. # # The merge occurs are the top level of workitem attributes. # # More complex merge behaviour can be obtained by extending the # GenericSyncExpression class. But the default sync options are already - # numerous and powerful. + # numerous and powerful by their combinations. # class ConcurrenceExpression < SequenceExpression include ConditionMixin names :concurrence @@ -137,27 +143,31 @@ def apply (workitem) sync = lookup_attribute(:sync, workitem, :generic) - @sync_expression = \ - get_expression_map().get_sync_class(sync).new(self, workitem) + @sync_expression = + get_expression_map.get_sync_class(sync).new(self, workitem) @children.each do |child| @sync_expression.add_child(child) end store_itself() concurrence = self - @children.each do |child| + @children.each_with_index do |child, index| Thread.new do begin #ldebug { "apply() child : #{child.to_debug_s}" } concurrence.synchronize do - get_expression_pool().apply(child, workitem.dup) + + get_expression_pool().apply( + child, + #workitem.dup) + get_workitem(workitem, index)) end rescue Exception => e lwarn do "apply() " + "caught exception in concurrent child " + @@ -184,13 +194,73 @@ end def reply (workitem) @sync_expression.reply(self, workitem) end + + protected + + def get_workitem (workitem, index) + workitem.dup + end end # + # This expression is a mix between a 'concurrence' and an 'iterator'. + # It understands the same attributes and behaves as an interator that + # forks its children concurrently. + # + # (See ConcurrenceExpression and IteratorExpression). + # + class ConcurrentIteratorExpression < ConcurrenceExpression + + names :concurrent_iterator + + #attr_accessor :iterator + + def apply (workitem) + + if @children.length < 1 + reply_to_parent workitem + return + end + + template = @children[0] + + @children.clear + + @workitems = [] + + iterator = Iterator.new(self, workitem) + + while iterator.has_next? + + wi = workitem.dup + + @workitems << wi + + vars = iterator.next self, wi + + rawexp = get_expression_pool.prepare_from_template( + self, iterator.index, template, vars) + + @children << rawexp.fei + end + + get_expression_pool.remove(template) + + super + end + + protected + + def get_workitem (workitem, index) + @workitems[index] + end + end + + # # A base for sync expressions, currently empty. # That may change. # class SyncExpression < ObjectWithMeta @@ -234,10 +304,12 @@ @cancel_remaining = cancel_remaining?(synchable, workitem) merge = synchable.lookup_attribute(:merge, workitem, :first) merge_type = synchable.lookup_attribute(:merge_type, workitem, :mix) + #synchable.ldebug { "new() merge_type is '#{merge_type}'" } + @merge_array = MergeArray.new(merge, merge_type) @unready_queue = [] end @@ -295,20 +367,25 @@ protected def do_reply (synchable, workitem) synchable.ldebug do - "#{self.class}.do_reply() "+ + "#{self.class}.do_reply() from"+ "#{workitem.last_expression_id.to_debug_s}" end @merge_array.push(synchable, workitem) @reply_count = @reply_count + 1 @remaining_children.delete(workitem.last_expression_id) + #synchable.ldebug do + # "#{self.class}.do_reply() "+ + # "remaining children : #{@remaining_children.length}" + #end + if @remaining_children.length <= 0 reply_to_parent(synchable) return true end @@ -362,12 +439,11 @@ end if @cancel_remaining expool.cancel(child) else - #expool.remove(child) - expool.forget(child) + expool.forget(synchable, child) end end end def cancel_remaining? (synchable_expression, workitem) @@ -386,27 +462,29 @@ return -1 if i < 1 i end # - # This inner class is used to gather workitems before the final - # merge, which is triggered by calling the do_merge() method - # which returns the merged workitem. + # This inner class is used to gather workitems (via push()) before + # the final merge + # This final merge is triggered by calling the do_merge() method + # which will return the resulting, merged workitem. # class MergeArray + include MergeMixin attr_accessor \ :workitem, :workitems_by_arrival, :workitems_by_altitude, :merge, :merge_type def initialize (merge, merge_type) - @merge = merge.downcase - @merge_type = merge_type.downcase + @merge = merge.strip.downcase.intern + @merge_type = merge_type.strip.downcase.intern ensure_merge_settings() @workitem = nil @@ -416,27 +494,53 @@ end end def push (synchable, wi) - if not @workitems_by_arrival - # - # last or first - # - source, target = if first? - [ @workitem, wi ] - else - [ wi, @workitem ] - end - @workitem = merge(target, source) + #synchable.ldebug do + # "push() isolate? #{isolate?}" + #end - return + if isolate? + push_in_isolation wi + elsif last? or first? + push_by_position wi + else + push_by_arrival wi end + end - index = synchable.children.index( - wi.last_expression_id) + def push_by_position (wi) + source, target = if first? + [ @workitem, wi ] + else + [ wi, @workitem ] + end + @workitem = merge_workitems target, source, override? + end + + def push_in_isolation (wi) + + unless @workitem + @workitem = wi.dup + att = @workitem.attributes + @workitem.attributes = {} + end + + #key = synchable.children.index wi.last_expression_id + key = wi.last_expression_id.child_id + + @workitem.attributes[key.to_s] = + OpenWFE::fulldup(wi.attributes) + end + + def push_by_arrival (wi) + + #index = synchable.children.index wi.last_expression_id + index = Integer(wi.last_expression_id.child_id) + @workitems_by_arrival << wi @workitems_by_altitude[index] = wi end # @@ -458,11 +562,11 @@ result = nil list.each do |wi| next unless wi - result = merge(result, wi) + result = merge_workitems result, wi, override? end #puts "___ result :" #puts result.to_s #puts @@ -471,56 +575,39 @@ end protected def first? - @merge == "first" + @merge == :first end def last? - @merge == "last" + @merge == :last end def highest? - @merge == "highest" + @merge == :highest end def lowest? - @merge == "lowest" + @merge == :lowest end def mix? - @merge_type == "mix" + @merge_type == :mix end def override? - @merge_type == "override" + @merge_type == :override end + def isolate? + @merge_type == :isolate + end # # Making sure @merge and @merge_type are set to # appropriate values. # def ensure_merge_settings - @merge_type = "mix" unless override? - @merge = "first" unless last? or highest? or lowest? - end - - def merge (wiTarget, wiSource) - - return wiSource unless wiTarget - return wiTarget unless wiSource - - return wiSource if override? - - wiSource.attributes.each do |k, v| - - #puts "merge() '#{k}' => '#{v}'" - - nk = OpenWFE::fulldup(k) - nv = OpenWFE::fulldup(v) - - wiTarget.attributes[nk] = nv - end - - wiTarget + @merge_type = :mix unless override? or isolate? + @merge = :first unless last? or highest? or lowest? end end end