lib/openwfe/expressions/fe_reserve.rb in openwferu-0.9.16 vs lib/openwfe/expressions/fe_reserve.rb in openwferu-0.9.17

- old
+ new

@@ -1,8 +1,8 @@ # #-- -# Copyright (c) 2007, John Mettraux, OpenWFE.org +# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # @@ -36,11 +36,10 @@ # # John Mettraux at openwfe.org # require 'thread' -require 'openwfe/expressions/fe_when' module OpenWFE # @@ -64,76 +63,149 @@ # # The sequence will not but run while the participant charly is active # and vice versa. The participant delta is not concerned. # # The mutex is a regular variable name, thus a mutex named "//toto" could - # be used to prevent segemnts of totally different process instances from + # be used to prevent segments of totally different process instances from # running. # - class ReserveExpression < WhenExpression + class ReserveExpression < FlowExpression # # A mutex for the whole class, it's meant to prevent 'reserve' # from reserving a workflow mutex simultaneaously. # - @@mutex = Mutex.new + #@@mutex = Mutex.new names :reserve + # + # The name of the mutex this expressions uses. + # It's a variable name, that means it can be prefixed with + # {nothing} (local scope), '/' (process scope) and '//' (engine / + # global scope). + # attr_accessor :mutex_name + # + # An instance variable for storing the applied workitem if the 'reserve' + # cannot be entered immediately. + # + attr_accessor :applied_workitem + + def apply (workitem) - if @children.size < 1 - reply_to_parent workitem - return - end + return reply_to_parent(workitem) \ + if @children.size < 1 @mutex_name = lookup_string_attribute :mutex, workitem - super + FlowMutex.synchronize do + + mutex = + lookup_variable(@mutex_name) || FlowMutex.new(@mutex_name) + + mutex.register self, workitem + end end def reply (workitem) - @@mutex.synchronize do + lookup_variable(@mutex_name).release self - delete_variable @mutex_name \ - if @consequence_triggered - # - # unset mutex - end + reply_to_parent workitem + end - super workitem + # + # Called by the FlowMutex to enter the 'reserved/critical' section. + # + def enter (workitem=nil) + + get_expression_pool.apply( + @children[0], workitem || @applied_workitem) end + end - protected + # + # A FlowMutex is a process variable (thus serializable) that keeps + # track of the expressions in a critical section (1!) or waiting for + # entering it. + # + # The current syncrhonization scheme is 1 thread mutex for all the + # FlowMutex. Shouldn't be too costly and the operations under sync are + # quite tiny. + # + class FlowMutex - def evaluate_condition + # + # Granularity level ? "big rock". Only one FlowMutex operation + # a a time for the whole business process engine... + # + @@class_mutex = Mutex.new - mutex = nil + attr_accessor :mutex_name + attr_accessor :feis - @@mutex.synchronize do + def initialize (mutex_name) - mutex = lookup_variable @mutex_name - - set_variable @mutex_name, fei.to_s \ - unless mutex - # - # reserve mutex - end + @mutex_name = mutex_name + @feis = [] + end - do_reply (mutex == nil) + def register (fexp, workitem) + + @feis << fexp.fei + + fexp.set_variable @mutex_name, self + + if @feis.size == 1 + # + # immediately let the expression enter the critical section + # + fexp.store_itself + fexp.enter workitem + else + # + # later... + # + fexp.applied_workitem = workitem + fexp.store_itself end + end - def apply_consequence (workitem) + def release (releaser) - @consequence_triggered = true + next_fei = nil - store_itself() + @@class_mutex.synchronize do - get_expression_pool.apply(@children[0], workitem) + current_fei = @feis.delete_at 0 + + releaser.set_variable @mutex_name, self + + log.warn "release() BAD! c:#{current_fei} r:#{releaser.fei}" \ + if releaser.fei != current_fei + + next_fei = @feis.first end + + return unless next_fei + + releaser.get_expression_pool.fetch_expression(next_fei).enter + end + + # + # Used by the ReserveExpression when looking up for a FlowMutex + # and registering into it. + # + def self.synchronize (&block) + + @@class_mutex.synchronize do + + block.call + end + end end end