lib/openwfe/expressions/fe_reserve.rb in openwferu-0.9.16 vs lib/openwfe/expressions/fe_reserve.rb in openwferu-0.9.17
- old
+ new
@@ -1,8 +1,8 @@
#
#--
-# Copyright (c) 2007, John Mettraux, OpenWFE.org
+# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
@@ -36,11 +36,10 @@
#
# John Mettraux at openwfe.org
#
require 'thread'
-require 'openwfe/expressions/fe_when'
module OpenWFE
#
@@ -64,76 +63,149 @@
#
# The sequence will not but run while the participant charly is active
# and vice versa. The participant delta is not concerned.
#
# The mutex is a regular variable name, thus a mutex named "//toto" could
- # be used to prevent segemnts of totally different process instances from
+ # be used to prevent segments of totally different process instances from
# running.
#
- class ReserveExpression < WhenExpression
+ class ReserveExpression < FlowExpression
#
# A mutex for the whole class, it's meant to prevent 'reserve'
# from reserving a workflow mutex simultaneaously.
#
- @@mutex = Mutex.new
+ #@@mutex = Mutex.new
names :reserve
+ #
+ # The name of the mutex this expressions uses.
+ # It's a variable name, that means it can be prefixed with
+ # {nothing} (local scope), '/' (process scope) and '//' (engine /
+ # global scope).
+ #
attr_accessor :mutex_name
+ #
+ # An instance variable for storing the applied workitem if the 'reserve'
+ # cannot be entered immediately.
+ #
+ attr_accessor :applied_workitem
+
+
def apply (workitem)
- if @children.size < 1
- reply_to_parent workitem
- return
- end
+ return reply_to_parent(workitem) \
+ if @children.size < 1
@mutex_name = lookup_string_attribute :mutex, workitem
- super
+ FlowMutex.synchronize do
+
+ mutex =
+ lookup_variable(@mutex_name) || FlowMutex.new(@mutex_name)
+
+ mutex.register self, workitem
+ end
end
def reply (workitem)
- @@mutex.synchronize do
+ lookup_variable(@mutex_name).release self
- delete_variable @mutex_name \
- if @consequence_triggered
- #
- # unset mutex
- end
+ reply_to_parent workitem
+ end
- super workitem
+ #
+ # Called by the FlowMutex to enter the 'reserved/critical' section.
+ #
+ def enter (workitem=nil)
+
+ get_expression_pool.apply(
+ @children[0], workitem || @applied_workitem)
end
+ end
- protected
+ #
+ # A FlowMutex is a process variable (thus serializable) that keeps
+ # track of the expressions in a critical section (1!) or waiting for
+ # entering it.
+ #
+ # The current syncrhonization scheme is 1 thread mutex for all the
+ # FlowMutex. Shouldn't be too costly and the operations under sync are
+ # quite tiny.
+ #
+ class FlowMutex
- def evaluate_condition
+ #
+ # Granularity level ? "big rock". Only one FlowMutex operation
+ # a a time for the whole business process engine...
+ #
+ @@class_mutex = Mutex.new
- mutex = nil
+ attr_accessor :mutex_name
+ attr_accessor :feis
- @@mutex.synchronize do
+ def initialize (mutex_name)
- mutex = lookup_variable @mutex_name
-
- set_variable @mutex_name, fei.to_s \
- unless mutex
- #
- # reserve mutex
- end
+ @mutex_name = mutex_name
+ @feis = []
+ end
- do_reply (mutex == nil)
+ def register (fexp, workitem)
+
+ @feis << fexp.fei
+
+ fexp.set_variable @mutex_name, self
+
+ if @feis.size == 1
+ #
+ # immediately let the expression enter the critical section
+ #
+ fexp.store_itself
+ fexp.enter workitem
+ else
+ #
+ # later...
+ #
+ fexp.applied_workitem = workitem
+ fexp.store_itself
end
+ end
- def apply_consequence (workitem)
+ def release (releaser)
- @consequence_triggered = true
+ next_fei = nil
- store_itself()
+ @@class_mutex.synchronize do
- get_expression_pool.apply(@children[0], workitem)
+ current_fei = @feis.delete_at 0
+
+ releaser.set_variable @mutex_name, self
+
+ log.warn "release() BAD! c:#{current_fei} r:#{releaser.fei}" \
+ if releaser.fei != current_fei
+
+ next_fei = @feis.first
end
+
+ return unless next_fei
+
+ releaser.get_expression_pool.fetch_expression(next_fei).enter
+ end
+
+ #
+ # Used by the ReserveExpression when looking up for a FlowMutex
+ # and registering into it.
+ #
+ def self.synchronize (&block)
+
+ @@class_mutex.synchronize do
+
+ block.call
+ end
+ end
end
end