lib/ruote/exp/fe_concurrence.rb in ruote-2.3.0.1 vs lib/ruote/exp/fe_concurrence.rb in ruote-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -20,14 +20,13 @@ # THE SOFTWARE. # # Made in Japan. #++ +require 'ruote/merge' -require 'ruote/exp/merge' - module Ruote::Exp # # The 'concurrence' expression applies its child branches in parallel # (well it makes a best effort to make them run in parallel). @@ -92,10 +91,29 @@ # This concurrence will be over when the branches alpha and bravo have # replied. The charly branch may have replied or not, it doesn't matter. # # :wait_for can be shortened to :wf. # + # + # === :over_if (and :over_unless) attribute + # + # Like the :count attribute controls how many branches have to reply before + # a concurrence ends, the :over attribute is used to specify a condition + # upon which the concurrence will [prematurely] end. + # + # concurrence :over_if => '${f:over}' + # alpha + # bravo + # charly + # end + # + # will end the concurrence as soon as one of the branches replies with a + # workitem whose field 'over' is set to true. (the remaining branches will + # get cancelled unless :remaining => :forget is set). + # + # :over_unless needs no explanation. + # # === :remaining # # As said for :count, the remaining branches get cancelled. By setting # :remaining to :forget (or 'forget'), the remaining branches will continue # their execution, forgotten. @@ -150,27 +168,27 @@ # # :merge can be shortened to :m. # # === :merge_type # - # ==== :override + # ==== :merge_type => :override (default) # # By default, the merge type is set to 'override', which means that the # 'winning' workitem's payload supplants all other workitems' payloads. # - # ==== :mix + # ==== :merge_type => :mix # # Setting :merge_type to :mix, will actually attempt to merge field by field, # making sure that the field value of the winner(s) are used. # - # ==== :isolate + # ==== :merge_type => :isolate # # :isolate will rearrange the resulting workitem payload so that there is # a new field for each branch. The name of each field is the index of the # branch from '0' to ... # - # ==== :stack + # ==== :merge_type => :stack # # :stack will stack the workitems coming back from the concurrence branches # in an array whose order is determined by the :merge attributes. The array # is placed in the 'stack' field of the resulting workitem. # Note that the :stack merge_type also creates a 'stack_attributes' field @@ -192,11 +210,11 @@ # 'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } } # # This could prove useful for participant having to deal with multiple merge # strategy results. # - # ==== :union + # ==== :merge_type => :union # # (Available from ruote 2.3.0) # # Will override atomic fields, concat arrays and merge hashes... # @@ -212,11 +230,11 @@ # 'c' => { 'aa' => 'bb', 'cc' => 'dd' } } # # Warning: duplicates in arrays present _before_ the merge will be removed # as well. # - # ==== :concat + # ==== :merge_type => :concat # # (Available from ruote 2.3.0) # # Much like :union, but duplicates are not removed. Thus # @@ -227,54 +245,41 @@ # # { 'a' => 1, # 'b' => [ 'x', 'y', 'y', 'z' ], # 'c' => { 'aa' => 'bb', 'cc' => 'dd' } } # - # ==== :deep + # ==== :merge_type => :deep # # (Available from ruote 2.3.0) # # Identical to :concat but hashes are merged with deep_merge (ActiveSupport # flavour). # - # ==== :ignore + # ==== :merge_type => :ignore # # (Available from ruote 2.3.0) # # A very simple merge type, the workitems given back by the branches are # simply discarded and the workitem as passed to the concurrence expression # is used to reply to the parent expression (of the concurrence expression). # # :merge_type can be shortened to :mt. # - # - # === :over_if (and :over_unless) attribute - # - # Like the :count attribute controls how many branches have to reply before - # a concurrence ends, the :over attribute is used to specify a condition - # upon which the concurrence will [prematurely] end. - # - # concurrence :over_if => '${f:over}' - # alpha - # bravo - # charly - # end - # - # will end the concurrence as soon as one of the branches replies with a - # workitem whose field 'over' is set to true. (the remaining branches will - # get cancelled unless :remaining => :forget is set). - # - # :over_unless needs no explanation. - # class ConcurrenceExpression < FlowExpression - include MergeMixin - names :concurrence COUNT_R = /^-?\d+$/ + # This method is used by some walking routines when analyzsing + # execution trees. Returns true for concurrence (and concurrent iterator). + # + def is_concurrent? + + true + end + def apply return do_reply_to_parent(h.applied_workitem) if tree_children.empty? # @@ -302,11 +307,18 @@ %w[ override mix isolate stack union ignore concat deep ]) h.remaining = att( [ :remaining, :rem, :r ], %w[ cancel forget wait ]) - h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {} + #h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {} + # + # now merging iteratively, not keeping track of all the workitems, + # but still able to deal with old flows with active h.workitems + # + h.workitems = [] if %w[ highest lowest ].include?(h.cmerge) + # + # still need to keep track of rank to get the right merging h.over = false apply_children @@ -323,24 +335,22 @@ # # since workitem field merging might happen, better to work on # a copy of the workitem (so that history, coming afterwards, # doesn't see a modified version of the workitem) - if h.cmerge == 'first' || h.cmerge == 'last' - h.workitems << workitem - else - h.workitems[workitem['fei']['expid']] = workitem - end - if h.wait_for && tag = workitem['fields']['__left_tag__'] h.wait_for.delete(tag) end over = h.over h.over = over || over?(workitem) + keep(workitem) + # is done after the over? determination for its looks at 'winner' + if (not over) && h.over + # # just became 'over' reply_to_parent(nil) elsif h.over && h.remaining == 'wait' @@ -360,10 +370,46 @@ end end protected + def keep(workitem) + + h.workitems = h.workitems.values if h.workitems.is_a?(Hash) + # align legacy expressions on new simplified way + + if h.workitems + # + # the old way (still used for highest / lowest) + + h.workitems << workitem + return + end + + # + # the new way, merging immediately + + h.workitem_count = (h.workitem_count || 0) + 1 + + return if h.cmerge_type == 'ignore' + + # preparing target and source in the right order for merging + + target, source = h.workitem, workitem + if + h.cmerge == 'first' && + ! %w[ stack union concat deep isolate ].include?(h.cmerge_type) + then + target, source = source, target + end + target, source = source, target if target && target.delete('winner') + target, source = source, target if source == nil + + h.workitem = Ruote.merge_workitem( + workitem_index(workitem), target, source, h.cmerge_type) + end + def apply_children # a) register children # b) persist # c) trigger children applies @@ -391,11 +437,12 @@ workitem['winner'] = true true elsif h.wait_for h.wait_for.empty? else - (h.workitems.size >= expected_count) + (workitem_count + 1 >= expected_count) + # the + 1 is necessary since #keep hasn't yet been called end end # How many branch replies are expected before the concurrence is over ? # @@ -423,17 +470,17 @@ # # remaining 'wait' case first if h.remaining == 'wait' - if h.workitems.size >= count_list_size + if workitem_count >= count_list_size # # all children have replied - workitem = merge_all_workitems + h.workitem = final_merge - do_unpersist && super(workitem, false) + do_unpersist && super(h.workitem, false) elsif h.children_cancelled == nil # # the concurrence is over, let's cancel all children and then # wait for them @@ -448,58 +495,107 @@ end # # remaining 'forget' and 'cancel' cases - workitem = merge_all_workitems + h.workitem = final_merge if h.children.empty? - do_unpersist && super(workitem, false) + do_unpersist && super(h.workitem, false) elsif h.remaining == 'cancel' if do_unpersist - super(workitem, false) + super(h.workitem, false) h.children.each { |i| @context.storage.put_msg('cancel', 'fei' => i) } end else # h.remaining == 'forget' h.variables = compile_variables h.forgotten = true - do_persist && super(workitem, false) + do_persist && super(h.workitem, false) end end # Called by #reply_to_parent, returns the unique, merged, workitem that # will be fed back to the parent expression. # - def merge_all_workitems + def final_merge - return h.applied_workitem if h.workitems.size < 1 - return h.applied_workitem if h.cmerge_type == 'ignore' + wi = if h.workitem - wis = case h.cmerge - when 'first' - h.workitems.reverse - when 'last' - h.workitems - when 'highest', 'lowest' - is = h.workitems.keys.sort.collect { |k| h.workitems[k] } - h.cmerge == 'highest' ? is.reverse : is + h.workitem + + elsif h.cmerge_type == 'ignore' || h.workitems.nil? || h.workitems.empty? + + h.applied_workitem + + else + + wis = h.workitems + + if %w[ highest lowest ].include?(h.cmerge) + wis = h.workitems.sort_by { |wi| wi['fei']['expid'] } + end + + if + %w[ first highest ].include?(h.cmerge) && + ! %w[ stack union concat deep ].include?(h.cmerge_type) + then + wis = wis.reverse + end + + as, bs = wis.partition { |wi| wi.delete('winner') } + wis = bs + as + # + # the 'winner' is the workitem that triggered successfully the + # :over_if or :over_unless, let's take him precedence in the merge... + + merge_workitems(wis, h.cmerge_type) end - as, bs = wis.partition { |wi| wi.delete('winner') } - wis = bs + as - # - # the 'winner' is the workitem that triggered successfully the - # :over_if or :over_unless, let's take him precedence in the merge... + if h.cmerge_type == 'stack' + wi['fields']['stack_attributes'] = compile_atts + end - merge_workitems(wis, h.cmerge_type) + wi + end + + # Returns the current count of workitem replies. + # + def workitem_count + + h.workitems ? h.workitems.size : (h.workitem_count || 0) + end + + # Given a workitem, returns its index (highest to lowest in the tree + # children... z-index?). + # + # Is overriden by the concurrent-iterator. + # + def workitem_index(workitem) + + Ruote.extract_child_id(workitem['fei']) + end + + # Given a list of workitems and a merge_type, will merge according to + # the merge type. + # + # The return value is the merged workitem. + # + # (Still used when dealing with highest/lowest merge_type and legacy + # concurrence/citerator expressions) + # + def merge_workitems(workitems, merge_type) + + workitems.inject(nil) do |t, wi| + Ruote.merge_workitem(workitem_index(wi), t, wi, merge_type) + end end end end