lib/ru/fe_concurrence.rb in openwferu-0.9.0 vs lib/ru/fe_concurrence.rb in openwferu-0.9.1

- old
+ new

@@ -1,6 +1,7 @@ # +#-- # Copyright (c) 2006, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -25,20 +26,22 @@ # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +#++ # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # +require 'monitor' require 'ru/flowexpression' require 'ru/rudefinitions' require 'ru/ruutils' @@ -55,18 +58,24 @@ def apply (workitem) sync = lookup_attribute(A_SYNC, workitem) sync = "generic" if not sync - @sync_expression = get_expression_map().get_sync_class(sync).new() + @sync_expression = \ + get_expression_map().get_sync_class(sync).new(@attributes) + #threads = [] @children.each do |child| + @sync_expression.add_child(child) + end + + @children.each do |child| t = Thread.new do begin - @sync_expression.apply_child(self, child, workitem) + get_expression_pool().apply(child, workitem.dup) rescue Exception => e lwarn do "apply() caught exception in concurrent child\n" + OpenWFEru::exception_to_s(e) end @@ -91,31 +100,94 @@ #end end class GenericSyncExpression < SyncExpression - def initialize () + attr_accessor \ + :remaining_children, + :count, + :reply_count, + :cancel_remaining + + def initialize (attributes) + super() + + @remaining_children = [] @reply_count = 0 + + @count = determine_count(attributes) + @cancel_remaining = determine_remaining(attributes) end - def apply_child (synchable, child, workitem) - synchronize do - @application_context = synchable.application_context - ldebug { "apply_child() #{child.to_debug_s}" } - synchable.get_expression_pool().apply(child, workitem.dup) - end + def add_child (child) + @remaining_children << child end def reply (synchable, workitem) synchronize do + @application_context = synchable.application_context + # + # ldebug uses the application context + ldebug { "reply() #{workitem.lastExpressionId.to_debug_s}" } + @reply_count = @reply_count + 1 + + @remaining_children.delete(workitem.last_expression_id) + return workitem \ - if @reply_count >= synchable.children.length + if @remaining_children.length <= 0 + + if @count > 0 and @reply_count >= @count + treat_remaining_children(synchable) + return workitem + end + return nil end + end + + protected + + def treat_remaining_children (synchable) + + expool = synchable.get_expression_pool + + @remaining_children.each do |child| + if @cancel_remaining + expool.cancel(child) + else + expool.remove(child) + end + end + end + + def determine_remaining (attributes) + return attributes[A_REMAINING] == REM_CANCEL + end + + def determine_count (attributes) + s = attributes[A_COUNT] + return -1 if not s + i = s.to_i + return -1 if i < 1 + return i + end + end + + # + # Merges a workitem (source) into another (target). + # If inPlace is left to false, a brand new workitem is returned, + # else the merge occurs directly into the target workitem. + # + def merge (wiTarget, wiSource, inPlace=false) + + wiTarget = wiTarget.dup if not inPlace + + wiSource.attributes.each do | k, v | + wiTarget.attributes[k.dup] = v.dup end end end