lib/openwfe/expressions/fe_concurrence.rb in ruote-0.9.19 vs lib/openwfe/expressions/fe_concurrence.rb in ruote-0.9.20
- old
+ new
@@ -1,43 +1,29 @@
-#
#--
-# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org
-# All rights reserved.
+# Copyright (c) 2006-2009, John Mettraux, jmettraux@gmail.com
#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
#
-# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
#
-# . Neither the name of the "OpenWFE" nor the names of its contributors may be
-# used to endorse or promote products derived from this software without
-# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
+# Made in Japan.
#++
-#
-#
-# "made in Japan"
-#
-# John Mettraux at openwfe.org
-#
require 'openwfe/utils'
require 'openwfe/rudefinitions'
require 'openwfe/expressions/merge'
require 'openwfe/expressions/condition'
@@ -134,80 +120,52 @@
class ConcurrenceExpression < SequenceExpression
include ConditionMixin
names :concurrence
- attr_accessor \
- :sync_expression
+ attr_accessor :sync_expression
-
def apply (workitem)
- sync = lookup_sym_attribute(
- :sync, workitem, :default => :generic)
+ extract_children # initializes the @children array
+ do_apply(workitem)
+ end
+
+ def reply (workitem)
+
+ @sync_expression.reply(self, workitem)
+ end
+
+ protected
+
+ def do_apply (workitem)
+
+ sync = lookup_sym_attribute(:sync, workitem, :default => :generic)
+
@sync_expression =
get_expression_map.get_sync_class(sync).new(self, workitem)
@children.each do |child|
- @sync_expression.add_child child
+ @sync_expression.add_child(child)
end
store_itself
- #concurrence = self
-
@children.each_with_index do |child, index|
- get_expression_pool.apply(
- child,
- get_workitem(workitem, index))
-
- #Thread.new do
- # begin
- # #ldebug { "apply() child : #{child.to_debug_s}" }
- # concurrence.synchronize do
- # get_expression_pool().apply(
- # child,
- # #workitem.dup)
- # get_workitem(workitem, index))
- # end
- # rescue Exception => e
- # lwarn do
- # "apply() " +
- # "caught exception in concurrent child " +
- # child.to_debug_s + "\n" +
- # OpenWFE::exception_to_s(e)
- # end
- # end
- #end
+ get_expression_pool.apply(child, get_workitem(workitem, index))
end
- #@sync_expression.ready(self)
- #
- # this is insufficient, have to do that :
-
- #synchronize do
- #
- # Making sure the freshest version of the concurrence
- # expression is used.
- # This is especially important when using pure persistence.
- #
- reloaded_self, _fei = get_expression_pool.fetch @fei
- reloaded_self.sync_expression.ready reloaded_self
- #end
+ reloaded_self, _fei = get_expression_pool.fetch(@fei)
+ reloaded_self.sync_expression.ready(reloaded_self)
end
- def reply (workitem)
- @sync_expression.reply(self, workitem)
- end
+ def get_workitem (workitem, index)
- protected
-
- def get_workitem (workitem, index)
- workitem.dup
- end
+ workitem.dup
+ end
end
#
# This expression is a mix between a 'concurrence' and an 'iterator'.
# It understands the same attributes and behaves as an interator that
@@ -231,56 +189,47 @@
#
class ConcurrentIteratorExpression < ConcurrenceExpression
names :concurrent_iterator
- #attr_accessor :template
-
- uses_template
-
-
def apply (workitem)
- return reply_to_parent(workitem) \
- if raw_children.length < 1
+ return reply_to_parent(workitem) if has_no_expression_child
@workitems = []
- iterator = Iterator.new self, workitem
+ iterator = Iterator.new(self, workitem)
return reply_to_parent(workitem) \
unless iterator.has_next?
while iterator.has_next?
wi = workitem.dup
-
+ vars = iterator.next(wi)
@workitems << wi
- vars = iterator.next wi
-
- #rawexp = get_expression_pool.prepare_from_template(
- # self, nil, iterator.index, template, vars)
- #@children << rawexp.fei
-
get_expression_pool.tprepare_child(
self,
raw_children.first,
iterator.index,
- true, # register child
- vars)
+ :register_child => true,
+ :dont_store_parent => true,
+ :variables => vars)
end
- super
+ store_itself
+
+ do_apply(workitem)
end
protected
- def get_workitem (workitem, index)
+ def get_workitem (wi, index)
- @workitems[index]
- end
+ @workitems[index]
+ end
end
#
# A base for sync expressions, currently empty.
# That may change.
@@ -345,37 +294,33 @@
# when all the children got applied concurrently, the concurrence
# calls this method to notify the sync expression that replies
# can be processed
#
def ready (synchable)
- #synchable.synchronize do
synchable.ldebug do
"ready() called by #{synchable.fei.to_debug_s} " +
"#{@unready_queue.length} wi waiting"
end
queue = @unready_queue
@unready_queue = nil
synchable.store_itself
- queue.each do |workitem|
- break if do_reply(synchable, workitem)
- #
- # do_reply() will return 'true' as soon as the
- # concurrence is over, if this is the case, the
- # queue should not be treated anymore
- end
- #end
+ queue.each { |workitem| break if do_reply(synchable, workitem) }
+ #
+ # do_reply() will return 'true' as soon as the
+ # concurrence is over, if this is the case, the
+ # queue should not be treated anymore
end
def add_child (child)
+
@remaining_children << child
end
def reply (synchable, workitem)
- #synchable.synchronize do
if @unready_queue
@unready_queue << workitem
@@ -385,277 +330,269 @@
"#{self.class}.reply() "+
"#{@unready_queue.length} wi waiting..."
end
else
- do_reply synchable, workitem
+
+ do_reply(synchable, workitem)
end
- #end
end
protected
- def do_reply (synchable, workitem)
+ def do_reply (synchable, workitem)
- synchable.ldebug do
- "#{self.class}.do_reply() from " +
- "#{workitem.last_expression_id.to_debug_s}"
- end
+ synchable.ldebug do
+ "#{self.class}.do_reply() from " +
+ "#{workitem.last_expression_id.to_debug_s}"
+ end
- @merge_array.push(synchable, workitem)
+ @merge_array.push(synchable, workitem)
- @reply_count = @reply_count + 1
+ @reply_count = @reply_count + 1
- @remaining_children.delete(workitem.last_expression_id)
+ @remaining_children.delete(workitem.last_expression_id)
- #synchable.ldebug do
- # "#{self.class}.do_reply() "+
- # "remaining children : #{@remaining_children.length}"
- #end
-
- if @remaining_children.length <= 0
- reply_to_parent(synchable)
- return true
- end
-
- if @count > 0 and @reply_count >= @count
- treat_remaining_children(synchable)
- reply_to_parent(synchable)
- return true
- end
-
+ if @over #over
#
- # over-if
+ # sync is over, don't determine it again
+ #
+ synchable.store_itself
+ return true
+ end
- conditional =
- synchable.eval_condition("over-if", workitem, "over-unless")
+ if @remaining_children.length <= 0
- if conditional
- treat_remaining_children(synchable)
- reply_to_parent(synchable)
- return true
- end
+ reply_to_parent(synchable)
+ return true
+ end
- #
- # not over, resuming
+ if @count > 0 and @reply_count >= @count
- synchable.store_itself()
+ @over = true
+ synchable.store_itself
- #synchable.ldebug do
- # "#{self.class}.do_reply() not replying to parent "+
- # "#{workitem.last_expression_id.to_debug_s}"
- #end
+ #treat_remaining_children(synchable)
+ synchable.get_workqueue.push(
+ self, :treat_remaining_children, synchable)
- false
+ return true
end
- def reply_to_parent (synchable)
+ #
+ # over-if
- workitem = @merge_array.do_merge
+ if synchable.eval_condition('over-if', workitem, 'over-unless')
- synchable.reply_to_parent workitem
+ @over = true
+ synchable.store_itself
+
+ #treat_remaining_children(synchable)
+ synchable.get_workqueue.push(
+ self, :treat_remaining_children, synchable)
+
+ return true
end
- def treat_remaining_children (synchable)
+ #
+ # not over, resuming
- expool = synchable.get_expression_pool
+ synchable.store_itself
- @remaining_children.each do |child|
+ false
+ end
- synchable.ldebug do
- "#{self.class}.treat_remainining_children() " +
- "#{child.to_debug_s} " +
- "(cancel ? #{@cancel_remaining})"
- end
+ def reply_to_parent (synchable)
- if @cancel_remaining
- expool.cancel(child)
- else
- expool.forget(synchable, child)
- end
- end
- end
+ workitem = @merge_array.do_merge
- def cancel_remaining? (synchable_expression, workitem)
+ synchable.reply_to_parent(workitem)
+ end
- s = synchable_expression.lookup_sym_attribute(
- :remaining, workitem, :default => :cancel)
+ def treat_remaining_children (synchable)
- (s == :cancel)
- end
+ @remaining_children.each do |child|
- def determine_count (synchable_expression, workitem)
+ synchable.ldebug do
+ "#{self.class}.treat_remainining_children() " +
+ "#{child.to_debug_s} " +
+ "(cancel ? #{@cancel_remaining})"
+ end
- c = synchable_expression.lookup_attribute :count, workitem
- return -1 if not c
- i = c.to_i
- return -1 if i < 1
- i
+ if @cancel_remaining
+ synchable.get_expression_pool.cancel(child)
+ else
+ synchable.get_expression_pool.forget(synchable, child)
+ end
end
- #
- # This inner class is used to gather workitems (via push()) before
- # the final merge
- # This final merge is triggered by calling the do_merge() method
- # which will return the resulting, merged workitem.
- #
- class MergeArray
- include MergeMixin
+ reply_to_parent(synchable)
+ end
- attr_accessor \
- :synchable_fei,
- :workitem,
- :workitems_by_arrival,
- :workitems_by_altitude,
- :merge,
- :merge_type
+ def cancel_remaining? (synchable_expression, workitem)
- def initialize (synchable_fei, merge, merge_type)
+ s = synchable_expression.lookup_sym_attribute(
+ :remaining, workitem, :default => :cancel)
- @synchable_fei = synchable_fei
+ (s == :cancel)
+ end
- @merge = merge
- @merge_type = merge_type
+ def determine_count (synchable_expression, workitem)
- ensure_merge_settings
+ c = synchable_expression.lookup_attribute(:count, workitem)
+ return -1 if not c
+ i = c.to_i
+ i < 1 ? -1 : i
+ end
- @workitem = nil
+ #
+ # This inner class is used to gather workitems (via push()) before
+ # the final merge
+ # This final merge is triggered by calling the do_merge() method
+ # which will return the resulting, merged workitem.
+ #
+ class MergeArray
+ include MergeMixin
- if highest? or lowest?
- @workitems_by_arrival = []
- @workitems_by_altitude = []
- end
- end
+ attr_accessor \
+ :synchable_fei,
+ :workitem,
+ :workitems_by_arrival,
+ :workitems_by_altitude,
+ :merge,
+ :merge_type
- def push (synchable, wi)
+ def initialize (synchable_fei, merge, merge_type)
- #synchable.ldebug do
- # "push() isolate? #{isolate?}"
- #end
+ @synchable_fei = synchable_fei
- if isolate?
- push_in_isolation wi
- elsif last? or first?
- push_by_position wi
- else
- push_by_arrival wi
- end
- end
+ @merge = merge
+ @merge_type = merge_type
- def push_by_position (wi)
+ ensure_merge_settings
- source, target = if first?
- [ @workitem, wi ]
- else
- [ wi, @workitem ]
- end
- @workitem = merge_workitems target, source, override?
+ @workitem = nil
+
+ if highest? or lowest?
+ @workitems_by_arrival = []
+ @workitems_by_altitude = []
end
+ end
- def push_in_isolation (wi)
+ def push (synchable, wi)
- unless @workitem
- @workitem = wi.dup
- att = @workitem.attributes
- @workitem.attributes = {}
- end
+ if isolate?
+ push_in_isolation(wi)
+ elsif last? or first?
+ push_by_position(wi)
+ else
+ push_by_arrival(wi)
+ end
+ end
- #key = synchable.children.index wi.last_expression_id
- #key = wi.last_expression_id.child_id
- key = get_child_id wi
+ def push_by_position (wi)
- @workitem.attributes[key.to_s] =
- OpenWFE::fulldup(wi.attributes)
+ source, target = if first?
+ [ @workitem, wi ]
+ else
+ [ wi, @workitem ]
end
+ @workitem = merge_workitems(target, source, override?)
+ end
- def push_by_arrival (wi)
+ def push_in_isolation (wi)
- #index = synchable.children.index wi.last_expression_id
- #index = Integer(wi.last_expression_id.child_id)
- index = Integer(get_child_id(wi))
-
- @workitems_by_arrival << wi
- @workitems_by_altitude[index] = wi
+ unless @workitem
+ @workitem = wi.dup
+ att = @workitem.attributes
+ @workitem.attributes = {}
end
- #
- # merges the workitems stored here
- #
- def do_merge
+ key = get_child_id(wi)
- return @workitem if @workitem
+ @workitem.attributes[key.to_s] = OpenWFE.fulldup(wi.attributes)
+ end
- list = if first?
- @workitems_by_arrival.reverse
- elsif last?
- @workitems_by_arrival
- elsif highest?
- @workitems_by_altitude.reverse
- elsif lowest?
- @workitems_by_altitude
- end
+ def push_by_arrival (wi)
- result = nil
+ #index = synchable.children.index wi.last_expression_id
+ #index = Integer(wi.last_expression_id.child_id)
+ index = Integer(get_child_id(wi))
- list.each do |wi|
- next unless wi
- result = merge_workitems result, wi, override?
- end
+ @workitems_by_arrival << wi
+ @workitems_by_altitude[index] = wi
+ end
- #puts "___ result :"
- #puts result.to_s
- #puts
+ #
+ # merges the workitems stored here
+ #
+ def do_merge
+ return @workitem if @workitem
+
+ list = if first?
+ @workitems_by_arrival.reverse
+ elsif last?
+ @workitems_by_arrival
+ elsif highest?
+ @workitems_by_altitude.reverse
+ elsif lowest?
+ @workitems_by_altitude
+ end
+
+ list.inject(nil) do |result, wi|
+ result = merge_workitems(result, wi, override?) if wi
result
end
+ end
- protected
+ protected
- def first?
- @merge == :first
- end
- def last?
- @merge == :last
- end
- def highest?
- @merge == :highest
- end
- def lowest?
- @merge == :lowest
- end
+ def first?
+ @merge == :first
+ end
+ def last?
+ @merge == :last
+ end
+ def highest?
+ @merge == :highest
+ end
+ def lowest?
+ @merge == :lowest
+ end
- def mix?
- @merge_type == :mix
- end
- def override?
- @merge_type == :override
- end
- def isolate?
- @merge_type == :isolate
- end
+ def mix?
+ @merge_type == :mix
+ end
+ def override?
+ @merge_type == :override
+ end
+ def isolate?
+ @merge_type == :isolate
+ end
- #
- # Returns the child id of the expression that just
- # replied with the given workitem.
- #
- def get_child_id (workitem)
+ #
+ # Returns the child id of the expression that just
+ # replied with the given workitem.
+ #
+ def get_child_id (workitem)
- return workitem.fei.child_id \
- if workitem.fei.wfid == @synchable_fei.wfid
+ return workitem.fei.child_id \
+ if workitem.fei.wfid == @synchable_fei.wfid
- workitem.fei.last_sub_instance_id
- end
+ workitem.fei.last_sub_instance_id
+ end
- #
- # Making sure @merge and @merge_type are set to
- # appropriate values.
- #
- def ensure_merge_settings
+ #
+ # Making sure @merge and @merge_type are set to
+ # appropriate values.
+ #
+ def ensure_merge_settings
- @merge_type = :mix unless override? or isolate?
- @merge = :first unless last? or highest? or lowest?
- end
+ @merge_type = :mix unless override? or isolate?
+ @merge = :first unless last? or highest? or lowest?
end
+ end
end
end