#
#--
# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# . Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# . Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
#
require 'openwfe/utils'
require 'openwfe/rudefinitions'
require 'openwfe/expressions/merge'
require 'openwfe/expressions/condition'
require 'openwfe/expressions/flowexpression'
require 'openwfe/expressions/iterator'
#
# base expressions like 'sequence' and 'concurrence'
#
module OpenWFE
#
# The concurrence expression will execute each of its (direct) children
# in parallel threads.
#
# Thus,
#
#
#
#
#
#
# Participants pa and pb will be 'treated' in parallel (quasi
# simultaneously).
#
# The concurrence expressions accept many attributes, that can get
# combined. By default, the concurrence waits for all its children to
# reply and returns the workitem of the first child that replied.
# The attributes tune this behaviour.
#
# count
#
#
#
#
#
#
# The concurrence will be over as soon as 'pa' or 'pb' replied, i.e.
# as soon as "1" child replied.
#
# remaining
#
# The attribute 'remaining' can take two values 'cancel' (the default) and
# 'forget'.
# Cancelled children are completely wiped away, forgotten ones continue
# to operate but their reply will simply get discarded.
#
# over-if
#
# 'over-if' accepts a 'boolean expression' (something replying 'true' or
# 'false'), if the expression evaluates to true, the concurrence will be
# over and the remaining children will get cancelled (the default) or
# forgotten.
#
# merge
#
# By default, the first child to reply to its parent 'concurrence'
# expression 'wins', i.e. its workitem is used for resuming the flow (after
# the concurrence).
#
# [first] The default : the first child to reply wins
# [last] The last child to reply wins
# [highest] The first 'defined' child (in the list of children) will win
# [lowest] The last 'defined' child (in the list of children) will win
#
# Thus, in that example
#
#
#
#
#
#
# when the concurrence is done, the workitem of 'pb' is used to resume the
# flow after the concurrence.
#
# merge-type
#
# [override] The default : no mix of values between the workitems do occur
# [mix] Priority is given to the 'winning' workitem but their values
# get mixed
# [isolate] the attributes of the workitem of each branch is placed
# in a field in the resulting workitem. For example, the
# attributes of the first branch will be stored under the
# field named '0' of the resulting workitem.
#
# The merge occurs are the top level of workitem attributes.
#
# More complex merge behaviour can be obtained by extending the
# GenericSyncExpression class. But the default sync options are already
# numerous and powerful by their combinations.
#
class ConcurrenceExpression < SequenceExpression
include ConditionMixin
names :concurrence
attr_accessor \
:sync_expression
def apply (workitem)
sync = lookup_sym_attribute(
:sync, workitem, :default => :generic)
@sync_expression =
get_expression_map.get_sync_class(sync).new(self, workitem)
@children.each do |child|
@sync_expression.add_child child
end
store_itself
#concurrence = self
@children.each_with_index do |child, index|
get_expression_pool.apply(
child,
get_workitem(workitem, index))
#Thread.new do
# begin
# #ldebug { "apply() child : #{child.to_debug_s}" }
# concurrence.synchronize do
# get_expression_pool().apply(
# child,
# #workitem.dup)
# get_workitem(workitem, index))
# end
# rescue Exception => e
# lwarn do
# "apply() " +
# "caught exception in concurrent child " +
# child.to_debug_s + "\n" +
# OpenWFE::exception_to_s(e)
# end
# end
#end
end
#@sync_expression.ready(self)
#
# this is insufficient, have to do that :
#synchronize do
#
# Making sure the freshest version of the concurrence
# expression is used.
# This is especially important when using pure persistence.
#
reloaded_self, _fei = get_expression_pool.fetch @fei
reloaded_self.sync_expression.ready reloaded_self
#end
end
def reply (workitem)
@sync_expression.reply(self, workitem)
end
protected
def get_workitem (workitem, index)
workitem.dup
end
end
#
# This expression is a mix between a 'concurrence' and an 'iterator'.
# It understands the same attributes and behaves as an interator that
# forks its children concurrently.
#
# Some examples :
#
#
#
#
#
# Within a Ruby process definition :
#
# sequence do
# set :field => f, :value => %w{ Alan, Bob, Clarence }
# #...
# concurrent_iterator :on_field => "f", :to_field => "p" do
# participant "${p}"
# end
# end
#
class ConcurrentIteratorExpression < ConcurrenceExpression
names :concurrent_iterator
#attr_accessor :template
uses_template
def apply (workitem)
return reply_to_parent(workitem) \
if raw_children.length < 1
@workitems = []
iterator = Iterator.new self, workitem
return reply_to_parent(workitem) \
unless iterator.has_next?
while iterator.has_next?
wi = workitem.dup
@workitems << wi
vars = iterator.next wi
#rawexp = get_expression_pool.prepare_from_template(
# self, nil, iterator.index, template, vars)
#@children << rawexp.fei
get_expression_pool.tprepare_child(
self,
raw_children.first,
iterator.index,
true, # register child
vars)
end
super
end
protected
def get_workitem (workitem, index)
@workitems[index]
end
end
#
# A base for sync expressions, currently empty.
# That may change.
#
class SyncExpression < ObjectWithMeta
def initialize
super
end
def self.names (*exp_names)
exp_names = exp_names.collect do |n|
n.to_s
end
meta_def :expression_names do
exp_names
end
end
end
#
# The classical OpenWFE sync expression.
# Used by 'concurrence' and 'concurrent-iterator'
#
class GenericSyncExpression < SyncExpression
names :generic
attr_accessor \
:remaining_children,
:count,
:reply_count,
:cancel_remaining,
:unready_queue
def initialize (synchable, workitem)
super()
@remaining_children = []
@reply_count = 0
@count = determine_count(synchable, workitem)
@cancel_remaining = cancel_remaining?(synchable, workitem)
merge = synchable.lookup_sym_attribute(
:merge, workitem, :default => :first)
merge_type = synchable.lookup_sym_attribute(
:merge_type, workitem, :default => :mix)
synchable.ldebug { "new() merge_type is '#{merge_type}'" }
@merge_array = MergeArray.new synchable.fei, merge, merge_type
@unready_queue = []
end
#
# when all the children got applied concurrently, the concurrence
# calls this method to notify the sync expression that replies
# can be processed
#
def ready (synchable)
#synchable.synchronize do
synchable.ldebug do
"ready() called by #{synchable.fei.to_debug_s} " +
"#{@unready_queue.length} wi waiting"
end
queue = @unready_queue
@unready_queue = nil
synchable.store_itself
queue.each do |workitem|
break if do_reply(synchable, workitem)
#
# do_reply() will return 'true' as soon as the
# concurrence is over, if this is the case, the
# queue should not be treated anymore
end
#end
end
def add_child (child)
@remaining_children << child
end
def reply (synchable, workitem)
#synchable.synchronize do
if @unready_queue
@unready_queue << workitem
synchable.store_itself
synchable.ldebug do
"#{self.class}.reply() "+
"#{@unready_queue.length} wi waiting..."
end
else
do_reply synchable, workitem
end
#end
end
protected
def do_reply (synchable, workitem)
synchable.ldebug do
"#{self.class}.do_reply() from " +
"#{workitem.last_expression_id.to_debug_s}"
end
@merge_array.push(synchable, workitem)
@reply_count = @reply_count + 1
@remaining_children.delete(workitem.last_expression_id)
#synchable.ldebug do
# "#{self.class}.do_reply() "+
# "remaining children : #{@remaining_children.length}"
#end
if @remaining_children.length <= 0
reply_to_parent(synchable)
return true
end
if @count > 0 and @reply_count >= @count
treat_remaining_children(synchable)
reply_to_parent(synchable)
return true
end
#
# over-if
conditional =
synchable.eval_condition("over-if", workitem, "over-unless")
if conditional
treat_remaining_children(synchable)
reply_to_parent(synchable)
return true
end
#
# not over, resuming
synchable.store_itself()
#synchable.ldebug do
# "#{self.class}.do_reply() not replying to parent "+
# "#{workitem.last_expression_id.to_debug_s}"
#end
false
end
def reply_to_parent (synchable)
workitem = @merge_array.do_merge
synchable.reply_to_parent workitem
end
def treat_remaining_children (synchable)
expool = synchable.get_expression_pool
@remaining_children.each do |child|
synchable.ldebug do
"#{self.class}.treat_remainining_children() " +
"#{child.to_debug_s} " +
"(cancel ? #{@cancel_remaining})"
end
if @cancel_remaining
expool.cancel(child)
else
expool.forget(synchable, child)
end
end
end
def cancel_remaining? (synchable_expression, workitem)
s = synchable_expression.lookup_sym_attribute(
:remaining, workitem, :default => :cancel)
(s == :cancel)
end
def determine_count (synchable_expression, workitem)
c = synchable_expression.lookup_attribute :count, workitem
return -1 if not c
i = c.to_i
return -1 if i < 1
i
end
#
# This inner class is used to gather workitems (via push()) before
# the final merge
# This final merge is triggered by calling the do_merge() method
# which will return the resulting, merged workitem.
#
class MergeArray
include MergeMixin
attr_accessor \
:synchable_fei,
:workitem,
:workitems_by_arrival,
:workitems_by_altitude,
:merge,
:merge_type
def initialize (synchable_fei, merge, merge_type)
@synchable_fei = synchable_fei
@merge = merge
@merge_type = merge_type
ensure_merge_settings
@workitem = nil
if highest? or lowest?
@workitems_by_arrival = []
@workitems_by_altitude = []
end
end
def push (synchable, wi)
#synchable.ldebug do
# "push() isolate? #{isolate?}"
#end
if isolate?
push_in_isolation wi
elsif last? or first?
push_by_position wi
else
push_by_arrival wi
end
end
def push_by_position (wi)
source, target = if first?
[ @workitem, wi ]
else
[ wi, @workitem ]
end
@workitem = merge_workitems target, source, override?
end
def push_in_isolation (wi)
unless @workitem
@workitem = wi.dup
att = @workitem.attributes
@workitem.attributes = {}
end
#key = synchable.children.index wi.last_expression_id
#key = wi.last_expression_id.child_id
key = get_child_id wi
@workitem.attributes[key.to_s] =
OpenWFE::fulldup(wi.attributes)
end
def push_by_arrival (wi)
#index = synchable.children.index wi.last_expression_id
#index = Integer(wi.last_expression_id.child_id)
index = Integer(get_child_id(wi))
@workitems_by_arrival << wi
@workitems_by_altitude[index] = wi
end
#
# merges the workitems stored here
#
def do_merge
return @workitem if @workitem
list = if first?
@workitems_by_arrival.reverse
elsif last?
@workitems_by_arrival
elsif highest?
@workitems_by_altitude.reverse
elsif lowest?
@workitems_by_altitude
end
result = nil
list.each do |wi|
next unless wi
result = merge_workitems result, wi, override?
end
#puts "___ result :"
#puts result.to_s
#puts
result
end
protected
def first?
@merge == :first
end
def last?
@merge == :last
end
def highest?
@merge == :highest
end
def lowest?
@merge == :lowest
end
def mix?
@merge_type == :mix
end
def override?
@merge_type == :override
end
def isolate?
@merge_type == :isolate
end
#
# Returns the child id of the expression that just
# replied with the given workitem.
#
def get_child_id (workitem)
return workitem.fei.child_id \
if workitem.fei.wfid == @synchable_fei.wfid
workitem.fei.last_sub_instance_id
end
#
# Making sure @merge and @merge_type are set to
# appropriate values.
#
def ensure_merge_settings
@merge_type = :mix unless override? or isolate?
@merge = :first unless last? or highest? or lowest?
end
end
end
end