lib/openwfe/expressions/fe_concurrence.rb in ruote-0.9.19 vs lib/openwfe/expressions/fe_concurrence.rb in ruote-0.9.20

- old
+ new

@@ -1,43 +1,29 @@ -# #-- -# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org -# All rights reserved. +# Copyright (c) 2006-2009, John Mettraux, jmettraux@gmail.com # -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: # -# . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. # -# . Neither the name of the "OpenWFE" nor the names of its contributors may be -# used to endorse or promote products derived from this software without -# specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. +# Made in Japan. #++ -# -# -# "made in Japan" -# -# John Mettraux at openwfe.org -# require 'openwfe/utils' require 'openwfe/rudefinitions' require 'openwfe/expressions/merge' require 'openwfe/expressions/condition' @@ -134,80 +120,52 @@ class ConcurrenceExpression < SequenceExpression include ConditionMixin names :concurrence - attr_accessor \ - :sync_expression + attr_accessor :sync_expression - def apply (workitem) - sync = lookup_sym_attribute( - :sync, workitem, :default => :generic) + extract_children # initializes the @children array + do_apply(workitem) + end + + def reply (workitem) + + @sync_expression.reply(self, workitem) + end + + protected + + def do_apply (workitem) + + sync = lookup_sym_attribute(:sync, workitem, :default => :generic) + @sync_expression = get_expression_map.get_sync_class(sync).new(self, workitem) @children.each do |child| - @sync_expression.add_child child + @sync_expression.add_child(child) end store_itself - #concurrence = self - @children.each_with_index do |child, index| - get_expression_pool.apply( - child, - get_workitem(workitem, index)) - - #Thread.new do - # begin - # #ldebug { "apply() child : #{child.to_debug_s}" } - # concurrence.synchronize do - # get_expression_pool().apply( - # child, - # #workitem.dup) - # get_workitem(workitem, index)) - # end - # rescue Exception => e - # lwarn do - # "apply() " + - # "caught exception in concurrent child " + - # child.to_debug_s + "\n" + - # OpenWFE::exception_to_s(e) - # end - # end - #end + get_expression_pool.apply(child, get_workitem(workitem, index)) end - #@sync_expression.ready(self) - # - # this is insufficient, have to do that : - - #synchronize do - # - # Making sure the freshest version of the concurrence - # expression is used. - # This is especially important when using pure persistence. - # - reloaded_self, _fei = get_expression_pool.fetch @fei - reloaded_self.sync_expression.ready reloaded_self - #end + reloaded_self, _fei = get_expression_pool.fetch(@fei) + reloaded_self.sync_expression.ready(reloaded_self) end - def reply (workitem) - @sync_expression.reply(self, workitem) - end + def get_workitem (workitem, index) - protected - - def get_workitem (workitem, index) - workitem.dup - end + workitem.dup + end end # # This expression is a mix between a 'concurrence' and an 'iterator'. # It understands the same attributes and behaves as an interator that @@ -231,56 +189,47 @@ # class ConcurrentIteratorExpression < ConcurrenceExpression names :concurrent_iterator - #attr_accessor :template - - uses_template - - def apply (workitem) - return reply_to_parent(workitem) \ - if raw_children.length < 1 + return reply_to_parent(workitem) if has_no_expression_child @workitems = [] - iterator = Iterator.new self, workitem + iterator = Iterator.new(self, workitem) return reply_to_parent(workitem) \ unless iterator.has_next? while iterator.has_next? wi = workitem.dup - + vars = iterator.next(wi) @workitems << wi - vars = iterator.next wi - - #rawexp = get_expression_pool.prepare_from_template( - # self, nil, iterator.index, template, vars) - #@children << rawexp.fei - get_expression_pool.tprepare_child( self, raw_children.first, iterator.index, - true, # register child - vars) + :register_child => true, + :dont_store_parent => true, + :variables => vars) end - super + store_itself + + do_apply(workitem) end protected - def get_workitem (workitem, index) + def get_workitem (wi, index) - @workitems[index] - end + @workitems[index] + end end # # A base for sync expressions, currently empty. # That may change. @@ -345,37 +294,33 @@ # when all the children got applied concurrently, the concurrence # calls this method to notify the sync expression that replies # can be processed # def ready (synchable) - #synchable.synchronize do synchable.ldebug do "ready() called by #{synchable.fei.to_debug_s} " + "#{@unready_queue.length} wi waiting" end queue = @unready_queue @unready_queue = nil synchable.store_itself - queue.each do |workitem| - break if do_reply(synchable, workitem) - # - # do_reply() will return 'true' as soon as the - # concurrence is over, if this is the case, the - # queue should not be treated anymore - end - #end + queue.each { |workitem| break if do_reply(synchable, workitem) } + # + # do_reply() will return 'true' as soon as the + # concurrence is over, if this is the case, the + # queue should not be treated anymore end def add_child (child) + @remaining_children << child end def reply (synchable, workitem) - #synchable.synchronize do if @unready_queue @unready_queue << workitem @@ -385,277 +330,269 @@ "#{self.class}.reply() "+ "#{@unready_queue.length} wi waiting..." end else - do_reply synchable, workitem + + do_reply(synchable, workitem) end - #end end protected - def do_reply (synchable, workitem) + def do_reply (synchable, workitem) - synchable.ldebug do - "#{self.class}.do_reply() from " + - "#{workitem.last_expression_id.to_debug_s}" - end + synchable.ldebug do + "#{self.class}.do_reply() from " + + "#{workitem.last_expression_id.to_debug_s}" + end - @merge_array.push(synchable, workitem) + @merge_array.push(synchable, workitem) - @reply_count = @reply_count + 1 + @reply_count = @reply_count + 1 - @remaining_children.delete(workitem.last_expression_id) + @remaining_children.delete(workitem.last_expression_id) - #synchable.ldebug do - # "#{self.class}.do_reply() "+ - # "remaining children : #{@remaining_children.length}" - #end - - if @remaining_children.length <= 0 - reply_to_parent(synchable) - return true - end - - if @count > 0 and @reply_count >= @count - treat_remaining_children(synchable) - reply_to_parent(synchable) - return true - end - + if @over #over # - # over-if + # sync is over, don't determine it again + # + synchable.store_itself + return true + end - conditional = - synchable.eval_condition("over-if", workitem, "over-unless") + if @remaining_children.length <= 0 - if conditional - treat_remaining_children(synchable) - reply_to_parent(synchable) - return true - end + reply_to_parent(synchable) + return true + end - # - # not over, resuming + if @count > 0 and @reply_count >= @count - synchable.store_itself() + @over = true + synchable.store_itself - #synchable.ldebug do - # "#{self.class}.do_reply() not replying to parent "+ - # "#{workitem.last_expression_id.to_debug_s}" - #end + #treat_remaining_children(synchable) + synchable.get_workqueue.push( + self, :treat_remaining_children, synchable) - false + return true end - def reply_to_parent (synchable) + # + # over-if - workitem = @merge_array.do_merge + if synchable.eval_condition('over-if', workitem, 'over-unless') - synchable.reply_to_parent workitem + @over = true + synchable.store_itself + + #treat_remaining_children(synchable) + synchable.get_workqueue.push( + self, :treat_remaining_children, synchable) + + return true end - def treat_remaining_children (synchable) + # + # not over, resuming - expool = synchable.get_expression_pool + synchable.store_itself - @remaining_children.each do |child| + false + end - synchable.ldebug do - "#{self.class}.treat_remainining_children() " + - "#{child.to_debug_s} " + - "(cancel ? #{@cancel_remaining})" - end + def reply_to_parent (synchable) - if @cancel_remaining - expool.cancel(child) - else - expool.forget(synchable, child) - end - end - end + workitem = @merge_array.do_merge - def cancel_remaining? (synchable_expression, workitem) + synchable.reply_to_parent(workitem) + end - s = synchable_expression.lookup_sym_attribute( - :remaining, workitem, :default => :cancel) + def treat_remaining_children (synchable) - (s == :cancel) - end + @remaining_children.each do |child| - def determine_count (synchable_expression, workitem) + synchable.ldebug do + "#{self.class}.treat_remainining_children() " + + "#{child.to_debug_s} " + + "(cancel ? #{@cancel_remaining})" + end - c = synchable_expression.lookup_attribute :count, workitem - return -1 if not c - i = c.to_i - return -1 if i < 1 - i + if @cancel_remaining + synchable.get_expression_pool.cancel(child) + else + synchable.get_expression_pool.forget(synchable, child) + end end - # - # This inner class is used to gather workitems (via push()) before - # the final merge - # This final merge is triggered by calling the do_merge() method - # which will return the resulting, merged workitem. - # - class MergeArray - include MergeMixin + reply_to_parent(synchable) + end - attr_accessor \ - :synchable_fei, - :workitem, - :workitems_by_arrival, - :workitems_by_altitude, - :merge, - :merge_type + def cancel_remaining? (synchable_expression, workitem) - def initialize (synchable_fei, merge, merge_type) + s = synchable_expression.lookup_sym_attribute( + :remaining, workitem, :default => :cancel) - @synchable_fei = synchable_fei + (s == :cancel) + end - @merge = merge - @merge_type = merge_type + def determine_count (synchable_expression, workitem) - ensure_merge_settings + c = synchable_expression.lookup_attribute(:count, workitem) + return -1 if not c + i = c.to_i + i < 1 ? -1 : i + end - @workitem = nil + # + # This inner class is used to gather workitems (via push()) before + # the final merge + # This final merge is triggered by calling the do_merge() method + # which will return the resulting, merged workitem. + # + class MergeArray + include MergeMixin - if highest? or lowest? - @workitems_by_arrival = [] - @workitems_by_altitude = [] - end - end + attr_accessor \ + :synchable_fei, + :workitem, + :workitems_by_arrival, + :workitems_by_altitude, + :merge, + :merge_type - def push (synchable, wi) + def initialize (synchable_fei, merge, merge_type) - #synchable.ldebug do - # "push() isolate? #{isolate?}" - #end + @synchable_fei = synchable_fei - if isolate? - push_in_isolation wi - elsif last? or first? - push_by_position wi - else - push_by_arrival wi - end - end + @merge = merge + @merge_type = merge_type - def push_by_position (wi) + ensure_merge_settings - source, target = if first? - [ @workitem, wi ] - else - [ wi, @workitem ] - end - @workitem = merge_workitems target, source, override? + @workitem = nil + + if highest? or lowest? + @workitems_by_arrival = [] + @workitems_by_altitude = [] end + end - def push_in_isolation (wi) + def push (synchable, wi) - unless @workitem - @workitem = wi.dup - att = @workitem.attributes - @workitem.attributes = {} - end + if isolate? + push_in_isolation(wi) + elsif last? or first? + push_by_position(wi) + else + push_by_arrival(wi) + end + end - #key = synchable.children.index wi.last_expression_id - #key = wi.last_expression_id.child_id - key = get_child_id wi + def push_by_position (wi) - @workitem.attributes[key.to_s] = - OpenWFE::fulldup(wi.attributes) + source, target = if first? + [ @workitem, wi ] + else + [ wi, @workitem ] end + @workitem = merge_workitems(target, source, override?) + end - def push_by_arrival (wi) + def push_in_isolation (wi) - #index = synchable.children.index wi.last_expression_id - #index = Integer(wi.last_expression_id.child_id) - index = Integer(get_child_id(wi)) - - @workitems_by_arrival << wi - @workitems_by_altitude[index] = wi + unless @workitem + @workitem = wi.dup + att = @workitem.attributes + @workitem.attributes = {} end - # - # merges the workitems stored here - # - def do_merge + key = get_child_id(wi) - return @workitem if @workitem + @workitem.attributes[key.to_s] = OpenWFE.fulldup(wi.attributes) + end - list = if first? - @workitems_by_arrival.reverse - elsif last? - @workitems_by_arrival - elsif highest? - @workitems_by_altitude.reverse - elsif lowest? - @workitems_by_altitude - end + def push_by_arrival (wi) - result = nil + #index = synchable.children.index wi.last_expression_id + #index = Integer(wi.last_expression_id.child_id) + index = Integer(get_child_id(wi)) - list.each do |wi| - next unless wi - result = merge_workitems result, wi, override? - end + @workitems_by_arrival << wi + @workitems_by_altitude[index] = wi + end - #puts "___ result :" - #puts result.to_s - #puts + # + # merges the workitems stored here + # + def do_merge + return @workitem if @workitem + + list = if first? + @workitems_by_arrival.reverse + elsif last? + @workitems_by_arrival + elsif highest? + @workitems_by_altitude.reverse + elsif lowest? + @workitems_by_altitude + end + + list.inject(nil) do |result, wi| + result = merge_workitems(result, wi, override?) if wi result end + end - protected + protected - def first? - @merge == :first - end - def last? - @merge == :last - end - def highest? - @merge == :highest - end - def lowest? - @merge == :lowest - end + def first? + @merge == :first + end + def last? + @merge == :last + end + def highest? + @merge == :highest + end + def lowest? + @merge == :lowest + end - def mix? - @merge_type == :mix - end - def override? - @merge_type == :override - end - def isolate? - @merge_type == :isolate - end + def mix? + @merge_type == :mix + end + def override? + @merge_type == :override + end + def isolate? + @merge_type == :isolate + end - # - # Returns the child id of the expression that just - # replied with the given workitem. - # - def get_child_id (workitem) + # + # Returns the child id of the expression that just + # replied with the given workitem. + # + def get_child_id (workitem) - return workitem.fei.child_id \ - if workitem.fei.wfid == @synchable_fei.wfid + return workitem.fei.child_id \ + if workitem.fei.wfid == @synchable_fei.wfid - workitem.fei.last_sub_instance_id - end + workitem.fei.last_sub_instance_id + end - # - # Making sure @merge and @merge_type are set to - # appropriate values. - # - def ensure_merge_settings + # + # Making sure @merge and @merge_type are set to + # appropriate values. + # + def ensure_merge_settings - @merge_type = :mix unless override? or isolate? - @merge = :first unless last? or highest? or lowest? - end + @merge_type = :mix unless override? or isolate? + @merge = :first unless last? or highest? or lowest? end + end end end