lib/ruote/exp/fe_concurrence.rb in ruote-2.2.0 vs lib/ruote/exp/fe_concurrence.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -50,10 +50,52 @@ # end # # in that example, the concurrence will terminate as soon as 1 (count) of # the branches replies. The other branch will get cancelled. # + # :count and :wait_for may point to a negative integer, meaning "all but + # x". + # + # concurrence :count => -2 do # all the branches replied but 2 + # # ... + # end + # + # :count can be shortened to :c. + # + # === :wait_for + # + # This attribute accepts either an integer, either a list of tags. + # + # When used with the integer, it's equivalent to the :count attribute: + # + # concurrence :wait_for => 1 do + # # ... + # end + # + # It waits for 1 branch to respond and then moves on (concurrence over). + # + # When used with a string (or an array), it extracts a list of tags and waits + # for the branches with those tags. Once all the tags have replied, + # the concurrence is over. + # + # concurrence :wait_for => 'alpha, bravo' do + # sequence :tag => 'alpha' do + # # ... + # end + # sequence :tag => 'bravo' do + # # ... + # end + # sequence :tag => 'charly' do + # # ... + # end + # end + # + # This concurrence will be over when the branches alpha and bravo have + # replied. The charly branch may have replied or not, it doesn't matter. + # + # :wait_for can be shortened to :wf. + # # === :remaining # # As said for :count, the remaining branches get cancelled. By setting # :remaining to :forget (or 'forget'), the remaining branches will continue # their execution, forgotten. @@ -61,10 +103,19 @@ # concurrence :count => 1, :remaining => :forget do # alpha # bravo # end # + # :remaining can be shortened to :rem or :r. + # + # The default is 'cancel', where all the remaining branches are cancelled + # while the hand is given back to the main flow. + # + # There is a third setting, 'wait'. It behaves like 'cancel', but the + # concurrence waits for the cancelled children to reply. The workitems + # from cancelled branches are merged in as well. + # # === :merge # # By default, the workitems override each others. By default, the first # workitem to reply will win. # @@ -95,10 +146,12 @@ # bravo # end # # makes sure that alpha's version of the workitem wins. # + # :merge can be shortened to :m. + # # === :merge_type # # ==== :override # # By default, the merge type is set to 'override', which means that the @@ -139,13 +192,65 @@ # 'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } } # # This could prove useful for participant having to deal with multiple merge # strategy results. # + # ==== :union # - # === :over_if (and :over_unless) + # (Available from ruote 2.3.0) # + # Will override atomic fields, concat arrays and merge hashes... + # + # The union of those two workitems + # + # { 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' } + # { 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' } + # + # will be + # + # { 'a' => 1, + # 'b' => [ 'x', 'y', 'z' ], + # 'c' => { 'aa' => 'bb', 'cc' => 'dd' } } + # + # Warning: duplicates in arrays present _before_ the merge will be removed + # as well. + # + # ==== :concat + # + # (Available from ruote 2.3.0) + # + # Much like :union, but duplicates are not removed. Thus + # + # { 'a' => 0, 'b' => [ 'x', 'y' ], 'c' => { 'aa' => 'bb' } + # { 'a' => 1, 'b' => [ 'y', 'z' ], 'c' => { 'cc' => 'dd' } + # + # will be + # + # { 'a' => 1, + # 'b' => [ 'x', 'y', 'y', 'z' ], + # 'c' => { 'aa' => 'bb', 'cc' => 'dd' } } + # + # ==== :deep + # + # (Available from ruote 2.3.0) + # + # Identical to :concat but hashes are merged with deep_merge (ActiveSupport + # flavour). + # + # ==== :ignore + # + # (Available from ruote 2.3.0) + # + # A very simple merge type, the workitems given back by the branches are + # simply discarded and the workitem as passed to the concurrence expression + # is used to reply to the parent expression (of the concurrence expression). + # + # :merge_type can be shortened to :mt. + # + # + # === :over_if (and :over_unless) attribute + # # Like the :count attribute controls how many branches have to reply before # a concurrence ends, the :over attribute is used to specify a condition # upon which the concurrence will [prematurely] end. # # concurrence :over_if => '${f:over}' @@ -164,42 +269,86 @@ include MergeMixin names :concurrence + COUNT_R = /^-?\d+$/ + def apply - h.ccount = attribute(:count).to_i rescue 0 - h.ccount = nil if h.ccount < 1 + return do_reply_to_parent(h.applied_workitem) if tree_children.empty? - h.cmerge = att(:merge, %w[ first last highest lowest ]) - h.cmerge_type = att(:merge_type, %w[ override mix isolate stack ]) - h.remaining = att(:remaining, %w[ cancel forget ]) + # + # count and wait_for + count = (attribute(:count) || attribute(:c)).to_s + count = nil unless COUNT_R.match(count) + + wf = count || attribute(:wait_for) || attribute(:wf) + + if COUNT_R.match(wf.to_s) + h.ccount = wf.to_i + elsif wf + h.wait_for = Ruote.comma_split(wf) + end + + # + # other attributes + + h.cmerge = att( + [ :merge, :m ], + %w[ first last highest lowest ]) + h.cmerge_type = att( + [ :merge_type, :mt ], + %w[ override mix isolate stack union ignore concat deep ]) + h.remaining = att( + [ :remaining, :rem, :r ], + %w[ cancel forget wait ]) + h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {} h.over = false apply_children + + @context.storage.put_msg( + 'reply', 'fei' => h.fei, 'workitem' => h.applied_workitem + ) if h.ccount == 0 + # + # force an immediate reply end def reply(workitem) + workitem = Ruote.fulldup(workitem) + # + # since workitem field merging might happen, better to work on + # a copy of the workitem (so that history, coming afterwards, + # doesn't see a modified version of the workitem) + if h.cmerge == 'first' || h.cmerge == 'last' h.workitems << workitem else h.workitems[workitem['fei']['expid']] = workitem end + if h.wait_for && tag = workitem['fields']['__left_tag__'] + h.wait_for.delete(tag) + end + over = h.over h.over = over || over?(workitem) if (not over) && h.over # just became 'over' reply_to_parent(nil) + elsif h.over && h.remaining == 'wait' + + reply_to_parent(nil) + elsif h.children.empty? do_unpersist || return @context.storage.put_msg( @@ -234,44 +383,88 @@ over_if = attribute(:over_if, workitem) over_unless = attribute(:over_unless, workitem) if over_if && Condition.true?(over_if) + workitem['winner'] = true true elsif over_unless && (not Condition.true?(over_unless)) + workitem['winner'] = true true + elsif h.wait_for + h.wait_for.empty? else (h.workitems.size >= expected_count) end end # How many branch replies are expected before the concurrence is over ? # + def expected_count + + if h.ccount.nil? + count_list_size + elsif h.ccount >= 0 + [ h.ccount, count_list_size ].min + else # all but 1, 2, ... + i = count_list_size + h.ccount + i < 1 ? 1 : i + end + end + # (note : concurrent_iterator overrides it) # - def expected_count + def count_list_size - h.ccount ? [ h.ccount, tree_children.size ].min : tree_children.size + tree_children.size end def reply_to_parent(_workitem) + # + # remaining 'wait' case first + + if h.remaining == 'wait' + + if h.workitems.size >= count_list_size + # + # all children have replied + + workitem = merge_all_workitems + + do_unpersist && super(workitem, false) + + elsif h.children_cancelled == nil + # + # the concurrence is over, let's cancel all children and then + # wait for them + + h.children_cancelled = true + do_persist + + h.children.each { |i| @context.storage.put_msg('cancel', 'fei' => i) } + end + + return + end + + # + # remaining 'forget' and 'cancel' cases + workitem = merge_all_workitems - if h.ccount == nil || h.children.empty? + if h.children.empty? do_unpersist && super(workitem, false) elsif h.remaining == 'cancel' - if r = do_unpersist + if do_unpersist super(workitem, false) - h.children.each do |i| - @context.storage.put_msg('cancel', 'fei' => i) #unless replied?(i) - end + h.children.each { |i| @context.storage.put_msg('cancel', 'fei' => i) } end else # h.remaining == 'forget' h.variables = compile_variables @@ -279,27 +472,34 @@ do_persist && super(workitem, false) end end + # Called by #reply_to_parent, returns the unique, merged, workitem that + # will be fed back to the parent expression. + # def merge_all_workitems return h.applied_workitem if h.workitems.size < 1 + return h.applied_workitem if h.cmerge_type == 'ignore' wis = case h.cmerge when 'first' h.workitems.reverse when 'last' h.workitems when 'highest', 'lowest' is = h.workitems.keys.sort.collect { |k| h.workitems[k] } h.cmerge == 'highest' ? is.reverse : is end - rwis = wis.reverse - wis.inject(nil) { |t, wi| - merge_workitems(rwis.index(wi), t, wi, h.cmerge_type) - } + as, bs = wis.partition { |wi| wi.delete('winner') } + wis = bs + as + # + # the 'winner' is the workitem that triggered successfully the + # :over_if or :over_unless, let's take him precedence in the merge... + + merge_workitems(wis, h.cmerge_type) end end end