lib/ru/fe_concurrence.rb in openwferu-0.9.0 vs lib/ru/fe_concurrence.rb in openwferu-0.9.1
- old
+ new
@@ -1,6 +1,7 @@
#
+#--
# Copyright (c) 2006, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
@@ -25,20 +26,22 @@
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
+#++
#
# $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
#
+require 'monitor'
require 'ru/flowexpression'
require 'ru/rudefinitions'
require 'ru/ruutils'
@@ -55,18 +58,24 @@
def apply (workitem)
sync = lookup_attribute(A_SYNC, workitem)
sync = "generic" if not sync
- @sync_expression = get_expression_map().get_sync_class(sync).new()
+ @sync_expression = \
+ get_expression_map().get_sync_class(sync).new(@attributes)
+
#threads = []
@children.each do |child|
+ @sync_expression.add_child(child)
+ end
+
+ @children.each do |child|
t = Thread.new do
begin
- @sync_expression.apply_child(self, child, workitem)
+ get_expression_pool().apply(child, workitem.dup)
rescue Exception => e
lwarn do
"apply() caught exception in concurrent child\n" +
OpenWFEru::exception_to_s(e)
end
@@ -91,31 +100,94 @@
#end
end
class GenericSyncExpression < SyncExpression
- def initialize ()
+ attr_accessor \
+ :remaining_children,
+ :count,
+ :reply_count,
+ :cancel_remaining
+
+ def initialize (attributes)
+
super()
+
+ @remaining_children = []
@reply_count = 0
+
+ @count = determine_count(attributes)
+ @cancel_remaining = determine_remaining(attributes)
end
- def apply_child (synchable, child, workitem)
- synchronize do
- @application_context = synchable.application_context
- ldebug { "apply_child() #{child.to_debug_s}" }
- synchable.get_expression_pool().apply(child, workitem.dup)
- end
+ def add_child (child)
+ @remaining_children << child
end
def reply (synchable, workitem)
synchronize do
+
@application_context = synchable.application_context
+ #
+ # ldebug uses the application context
+
ldebug { "reply() #{workitem.lastExpressionId.to_debug_s}" }
+
@reply_count = @reply_count + 1
+
+ @remaining_children.delete(workitem.last_expression_id)
+
return workitem \
- if @reply_count >= synchable.children.length
+ if @remaining_children.length <= 0
+
+ if @count > 0 and @reply_count >= @count
+ treat_remaining_children(synchable)
+ return workitem
+ end
+
return nil
end
+ end
+
+ protected
+
+ def treat_remaining_children (synchable)
+
+ expool = synchable.get_expression_pool
+
+ @remaining_children.each do |child|
+ if @cancel_remaining
+ expool.cancel(child)
+ else
+ expool.remove(child)
+ end
+ end
+ end
+
+ def determine_remaining (attributes)
+ return attributes[A_REMAINING] == REM_CANCEL
+ end
+
+ def determine_count (attributes)
+ s = attributes[A_COUNT]
+ return -1 if not s
+ i = s.to_i
+ return -1 if i < 1
+ return i
+ end
+ end
+
+ #
+ # Merges a workitem (source) into another (target).
+ # If inPlace is left to false, a brand new workitem is returned,
+ # else the merge occurs directly into the target workitem.
+ #
+ def merge (wiTarget, wiSource, inPlace=false)
+
+ wiTarget = wiTarget.dup if not inPlace
+
+ wiSource.attributes.each do | k, v |
+ wiTarget.attributes[k.dup] = v.dup
end
end
end