lib/ruote/exp/fe_concurrence.rb in ruote-2.3.0.1 vs lib/ruote/exp/fe_concurrence.rb in ruote-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -20,14 +20,13 @@
# THE SOFTWARE.
#
# Made in Japan.
#++
+require 'ruote/merge'
-require 'ruote/exp/merge'
-
module Ruote::Exp
#
# The 'concurrence' expression applies its child branches in parallel
# (well it makes a best effort to make them run in parallel).
@@ -92,10 +91,29 @@
# This concurrence will be over when the branches alpha and bravo have
# replied. The charly branch may have replied or not, it doesn't matter.
#
# :wait_for can be shortened to :wf.
#
+ #
+ # === :over_if (and :over_unless) attribute
+ #
+ # Like the :count attribute controls how many branches have to reply before
+ # a concurrence ends, the :over attribute is used to specify a condition
+ # upon which the concurrence will [prematurely] end.
+ #
+ # concurrence :over_if => '${f:over}'
+ # alpha
+ # bravo
+ # charly
+ # end
+ #
+ # will end the concurrence as soon as one of the branches replies with a
+ # workitem whose field 'over' is set to true. (the remaining branches will
+ # get cancelled unless :remaining => :forget is set).
+ #
+ # :over_unless needs no explanation.
+ #
# === :remaining
#
# As said for :count, the remaining branches get cancelled. By setting
# :remaining to :forget (or 'forget'), the remaining branches will continue
# their execution, forgotten.
@@ -150,27 +168,27 @@
#
# :merge can be shortened to :m.
#
# === :merge_type
#
- # ==== :override
+ # ==== :merge_type => :override (default)
#
# By default, the merge type is set to 'override', which means that the
# 'winning' workitem's payload supplants all other workitems' payloads.
#
- # ==== :mix
+ # ==== :merge_type => :mix
#
# Setting :merge_type to :mix, will actually attempt to merge field by field,
# making sure that the field value of the winner(s) are used.
#
- # ==== :isolate
+ # ==== :merge_type => :isolate
#
# :isolate will rearrange the resulting workitem payload so that there is
# a new field for each branch. The name of each field is the index of the
# branch from '0' to ...
#
- # ==== :stack
+ # ==== :merge_type => :stack
#
# :stack will stack the workitems coming back from the concurrence branches
# in an array whose order is determined by the :merge attributes. The array
# is placed in the 'stack' field of the resulting workitem.
# Note that the :stack merge_type also creates a 'stack_attributes' field
@@ -192,11 +210,11 @@
# 'stack_attributes' => { 'merge'=> 'highest', 'merge_type' => 'stack' } }
#
# This could prove useful for participant having to deal with multiple merge
# strategy results.
#
- # ==== :union
+ # ==== :merge_type => :union
#
# (Available from ruote 2.3.0)
#
# Will override atomic fields, concat arrays and merge hashes...
#
@@ -212,11 +230,11 @@
# 'c' => { 'aa' => 'bb', 'cc' => 'dd' } }
#
# Warning: duplicates in arrays present _before_ the merge will be removed
# as well.
#
- # ==== :concat
+ # ==== :merge_type => :concat
#
# (Available from ruote 2.3.0)
#
# Much like :union, but duplicates are not removed. Thus
#
@@ -227,54 +245,41 @@
#
# { 'a' => 1,
# 'b' => [ 'x', 'y', 'y', 'z' ],
# 'c' => { 'aa' => 'bb', 'cc' => 'dd' } }
#
- # ==== :deep
+ # ==== :merge_type => :deep
#
# (Available from ruote 2.3.0)
#
# Identical to :concat but hashes are merged with deep_merge (ActiveSupport
# flavour).
#
- # ==== :ignore
+ # ==== :merge_type => :ignore
#
# (Available from ruote 2.3.0)
#
# A very simple merge type, the workitems given back by the branches are
# simply discarded and the workitem as passed to the concurrence expression
# is used to reply to the parent expression (of the concurrence expression).
#
# :merge_type can be shortened to :mt.
#
- #
- # === :over_if (and :over_unless) attribute
- #
- # Like the :count attribute controls how many branches have to reply before
- # a concurrence ends, the :over attribute is used to specify a condition
- # upon which the concurrence will [prematurely] end.
- #
- # concurrence :over_if => '${f:over}'
- # alpha
- # bravo
- # charly
- # end
- #
- # will end the concurrence as soon as one of the branches replies with a
- # workitem whose field 'over' is set to true. (the remaining branches will
- # get cancelled unless :remaining => :forget is set).
- #
- # :over_unless needs no explanation.
- #
class ConcurrenceExpression < FlowExpression
- include MergeMixin
-
names :concurrence
COUNT_R = /^-?\d+$/
+ # This method is used by some walking routines when analyzsing
+ # execution trees. Returns true for concurrence (and concurrent iterator).
+ #
+ def is_concurrent?
+
+ true
+ end
+
def apply
return do_reply_to_parent(h.applied_workitem) if tree_children.empty?
#
@@ -302,11 +307,18 @@
%w[ override mix isolate stack union ignore concat deep ])
h.remaining = att(
[ :remaining, :rem, :r ],
%w[ cancel forget wait ])
- h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {}
+ #h.workitems = (h.cmerge == 'first' || h.cmerge == 'last') ? [] : {}
+ #
+ # now merging iteratively, not keeping track of all the workitems,
+ # but still able to deal with old flows with active h.workitems
+ #
+ h.workitems = [] if %w[ highest lowest ].include?(h.cmerge)
+ #
+ # still need to keep track of rank to get the right merging
h.over = false
apply_children
@@ -323,24 +335,22 @@
#
# since workitem field merging might happen, better to work on
# a copy of the workitem (so that history, coming afterwards,
# doesn't see a modified version of the workitem)
- if h.cmerge == 'first' || h.cmerge == 'last'
- h.workitems << workitem
- else
- h.workitems[workitem['fei']['expid']] = workitem
- end
-
if h.wait_for && tag = workitem['fields']['__left_tag__']
h.wait_for.delete(tag)
end
over = h.over
h.over = over || over?(workitem)
+ keep(workitem)
+ # is done after the over? determination for its looks at 'winner'
+
if (not over) && h.over
+ #
# just became 'over'
reply_to_parent(nil)
elsif h.over && h.remaining == 'wait'
@@ -360,10 +370,46 @@
end
end
protected
+ def keep(workitem)
+
+ h.workitems = h.workitems.values if h.workitems.is_a?(Hash)
+ # align legacy expressions on new simplified way
+
+ if h.workitems
+ #
+ # the old way (still used for highest / lowest)
+
+ h.workitems << workitem
+ return
+ end
+
+ #
+ # the new way, merging immediately
+
+ h.workitem_count = (h.workitem_count || 0) + 1
+
+ return if h.cmerge_type == 'ignore'
+
+ # preparing target and source in the right order for merging
+
+ target, source = h.workitem, workitem
+ if
+ h.cmerge == 'first' &&
+ ! %w[ stack union concat deep isolate ].include?(h.cmerge_type)
+ then
+ target, source = source, target
+ end
+ target, source = source, target if target && target.delete('winner')
+ target, source = source, target if source == nil
+
+ h.workitem = Ruote.merge_workitem(
+ workitem_index(workitem), target, source, h.cmerge_type)
+ end
+
def apply_children
# a) register children
# b) persist
# c) trigger children applies
@@ -391,11 +437,12 @@
workitem['winner'] = true
true
elsif h.wait_for
h.wait_for.empty?
else
- (h.workitems.size >= expected_count)
+ (workitem_count + 1 >= expected_count)
+ # the + 1 is necessary since #keep hasn't yet been called
end
end
# How many branch replies are expected before the concurrence is over ?
#
@@ -423,17 +470,17 @@
#
# remaining 'wait' case first
if h.remaining == 'wait'
- if h.workitems.size >= count_list_size
+ if workitem_count >= count_list_size
#
# all children have replied
- workitem = merge_all_workitems
+ h.workitem = final_merge
- do_unpersist && super(workitem, false)
+ do_unpersist && super(h.workitem, false)
elsif h.children_cancelled == nil
#
# the concurrence is over, let's cancel all children and then
# wait for them
@@ -448,58 +495,107 @@
end
#
# remaining 'forget' and 'cancel' cases
- workitem = merge_all_workitems
+ h.workitem = final_merge
if h.children.empty?
- do_unpersist && super(workitem, false)
+ do_unpersist && super(h.workitem, false)
elsif h.remaining == 'cancel'
if do_unpersist
- super(workitem, false)
+ super(h.workitem, false)
h.children.each { |i| @context.storage.put_msg('cancel', 'fei' => i) }
end
else # h.remaining == 'forget'
h.variables = compile_variables
h.forgotten = true
- do_persist && super(workitem, false)
+ do_persist && super(h.workitem, false)
end
end
# Called by #reply_to_parent, returns the unique, merged, workitem that
# will be fed back to the parent expression.
#
- def merge_all_workitems
+ def final_merge
- return h.applied_workitem if h.workitems.size < 1
- return h.applied_workitem if h.cmerge_type == 'ignore'
+ wi = if h.workitem
- wis = case h.cmerge
- when 'first'
- h.workitems.reverse
- when 'last'
- h.workitems
- when 'highest', 'lowest'
- is = h.workitems.keys.sort.collect { |k| h.workitems[k] }
- h.cmerge == 'highest' ? is.reverse : is
+ h.workitem
+
+ elsif h.cmerge_type == 'ignore' || h.workitems.nil? || h.workitems.empty?
+
+ h.applied_workitem
+
+ else
+
+ wis = h.workitems
+
+ if %w[ highest lowest ].include?(h.cmerge)
+ wis = h.workitems.sort_by { |wi| wi['fei']['expid'] }
+ end
+
+ if
+ %w[ first highest ].include?(h.cmerge) &&
+ ! %w[ stack union concat deep ].include?(h.cmerge_type)
+ then
+ wis = wis.reverse
+ end
+
+ as, bs = wis.partition { |wi| wi.delete('winner') }
+ wis = bs + as
+ #
+ # the 'winner' is the workitem that triggered successfully the
+ # :over_if or :over_unless, let's take him precedence in the merge...
+
+ merge_workitems(wis, h.cmerge_type)
end
- as, bs = wis.partition { |wi| wi.delete('winner') }
- wis = bs + as
- #
- # the 'winner' is the workitem that triggered successfully the
- # :over_if or :over_unless, let's take him precedence in the merge...
+ if h.cmerge_type == 'stack'
+ wi['fields']['stack_attributes'] = compile_atts
+ end
- merge_workitems(wis, h.cmerge_type)
+ wi
+ end
+
+ # Returns the current count of workitem replies.
+ #
+ def workitem_count
+
+ h.workitems ? h.workitems.size : (h.workitem_count || 0)
+ end
+
+ # Given a workitem, returns its index (highest to lowest in the tree
+ # children... z-index?).
+ #
+ # Is overriden by the concurrent-iterator.
+ #
+ def workitem_index(workitem)
+
+ Ruote.extract_child_id(workitem['fei'])
+ end
+
+ # Given a list of workitems and a merge_type, will merge according to
+ # the merge type.
+ #
+ # The return value is the merged workitem.
+ #
+ # (Still used when dealing with highest/lowest merge_type and legacy
+ # concurrence/citerator expressions)
+ #
+ def merge_workitems(workitems, merge_type)
+
+ workitems.inject(nil) do |t, wi|
+ Ruote.merge_workitem(workitem_index(wi), t, wi, merge_type)
+ end
end
end
end