# #-- # Copyright (c) 2006-2007, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'uri' require 'monitor' require 'net/http' require 'rexml/document' require 'openwfe/utils' require 'openwfe/service' require 'openwfe/logging' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/lru_cache' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' include OpenWFE module OpenWFE # # a small help class for storing monitors provided on demand # to expressions that need them # class MonitorProvider include MonitorMixin, Logging MAX_MONITORS = 10000 def initialize (application_context=nil) super() @application_context = application_context @monitors = LRUCache.new(MAX_MONITORS) end def [] (key) synchronize do #ldebug { "[] caller :\n" + OpenWFE::caller_to_s(8) } mon = @monitors[key] if not mon #ldebug { "[] creating new Monitor for #{key}" } mon = Monitor.new @monitors[key] = mon else #ldebug { "[] already had Monitor for #{key}" } end return mon end end def delete (key) synchronize do #ldebug { "delete() removing Monitor for #{key}" } @monitors.delete(key) end end end GONE = "gone" # # This special flow expression id is used by the forget() method # (which is used by the forget expression and the concurrence # synchronization expressions) # GONE_PARENT_ID = FlowExpressionId.new GONE_PARENT_ID.engine_id = GONE GONE_PARENT_ID.initial_engine_id = GONE GONE_PARENT_ID.workflow_definition_url = GONE GONE_PARENT_ID.workflow_definition_name = GONE GONE_PARENT_ID.workflow_definition_revision = GONE GONE_PARENT_ID.workflow_instance_id = "-1" GONE_PARENT_ID.expression_name = GONE GONE_PARENT_ID.expression_id = "-1" GONE_PARENT_ID.freeze # # The ExpressionPool stores expressions (pieces of workflow instance). # It's the core of the workflow engine. # It relies on an expression storage for actual persistence of the # expressions. # class ExpressionPool < Service include MonitorMixin, OwfeServiceLocator @@last_given_instance_id = -1 # # storing at class level the last workflow instance id given def initialize (serviceName, applicationContext) super(serviceName, applicationContext) @monitors = MonitorProvider.new(application_context) end # # Obtains a unique monitor for an expression. # It avoids the need for the FlowExpression instances to include # the monitor mixin by themselves # def get_monitor (fei) return @monitors[fei] end # # Instantiates a workflow definition and launches it. # def launch (launchitem) rawExpression = buildRawExpression(launchitem) wi = build_workitem(launchitem) rawExpression.apply(wi) end # # launches a subprocess # def launch_template \ (requesting_expression, template_fei, workitem, params=nil) ldebug { "launch() request for #{template_fei.to_debug_s}" } rawexp, fei = fetch(template_fei) rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression.kind_of? FlowExpressionId rawexp.parent_id = requesting_expression rawexp.fei.workflow_instance_id = \ "#{requesting_expression.workflow_instance_id}.0" elsif requesting_expression.kind_of? String rawexp.parent_id = nil rawexp.fei.workflow_instance_id = \ "#{requesting_expression}.0" else # kind is FlowExpression rawexp.parent_id = requesting_expression.fei rawexp.fei.workflow_instance_id = \ "#{requesting_expression.fei.workflow_instance_id}.0" end #ldebug do # "launch_template() spawning wfid " + # "#{rawexp.fei.workflow_instance_id.to_s}" #end env = rawexp.new_environment() params.each { |k, v| env[k] = v } if params # # the new scope gets its own environment rawexp.store_itself() rawexp.apply(workitem) end # # Evaluates a raw definition expression and # returns its body fei # def evaluate (rawExpression, workitem) exp = rawExpression.instantiate_real_expression(workitem) fei = exp.evaluate(workitem) remove(rawExpression) return fei end # # Applies a given expression (id or expression) # def apply (exp, workitem) exp, fei = fetch(exp) ldebug { "apply() '#{fei}' (#{fei.class})" } if not exp lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } return end ldebug { "apply() #{fei.to_debug_s}" } workitem.last_expression_id = exp.fei exp.apply(workitem) end # # Cancels the given expression # def cancel (exp) exp, fei = fetch(exp) if not exp ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" } return nil end ldebug { "cancel() for #{fei.to_debug_s}" } inflowitem = exp.cancel() remove(exp) return inflowitem end # # Forgets the given expression (makes sure to substitute its # parent_id with the GONE_PARENT_ID constant) # def forget (exp) exp, fei = fetch(exp) return if not exp exp.parent_id = GONE_PARENT_ID exp.store_itself() end # # Replies to the parent of the given expression. # def reply_to_parent (exp, workitem) exp, fei = fetch(exp) workitem.last_expression_id = fei remove(exp, workitem) if not exp.parent_id ldebug do "reply_to_parent() process " + "#{exp.fei.workflow_instance_id} terminated" end else if exp.parent_id == GONE_PARENT_ID ldebug do "reply_to_parent() parent is gone for " + exp.fei.to_debug_s end else reply(exp.parent_id, workitem) end end end # # Triggers the reply expression of the expression given by its id. # def reply (exp, workitem) exp, fei = fetch(exp) ldebug { "reply() to #{fei.to_debug_s}" } ldebug { "reply() from #{workitem.last_expression_id}" } if not exp #raise "cannot reply to missing #{fei.to_debug_s}" lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } return end exp.reply(workitem) end # # Adds or updates a flow expression in this pool # def update (flowExpression) get_expression_storage()[flowExpression.fei] = flowExpression end # # Fetches a FlowExpression from the pool. # Returns a tuple : the FlowExpression plus its FlowExpressionId. # # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch (exp) synchronize do fei = exp if exp.kind_of? FlowExpression fei = exp.fei elsif not exp.kind_of? FlowExpressionId raise \ "Cannot fetch expression with key : "+ "'#{fei}' (#{fei.class})" end return get_expression_storage()[fei], fei end end # # Fetches a FlowExpression (returns only the FlowExpression instance) # # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch_expression (exp) exp, _fei = fetch(exp) return exp end def fetch_engine_environment () synchronize do eei = engine_environment_id ee, fei = fetch(eei) if not ee ee = Environment\ .new(eei, nil, nil, @application_context, nil) ee.store_itself() end ldebug { "fetch_engine_environment() stored new ee" } return ee end end # # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # def remove (exp, workitem=nil) exp, fei = fetch(exp) return if not exp ldebug { "remove() fe #{fei.to_debug_s}" } synchronize do @monitors.delete(fei) #get_expression_storage().delete(fei) get_expression_storage().remove(fei, workitem) if exp.owns_its_environment? remove_environment(exp.environment_id) end end end def engine_environment_id () synchronize do return @eei if @eei @eei = FlowExpressionId.new @eei.owfe_version = OPENWFE_VERSION @eei.engine_id = get_engine.service_name @eei.initial_engine_id = @eei.engine_id @eei.workflow_definition_url = 'ee' @eei.workflow_definition_name = 'ee' @eei.workflow_definition_revision = '0' @eei.workflow_instance_id = '0' @eei.expression_name = EN_ENVIRONMENT @eei.expression_id = '0' return @eei end end protected def evaluate_definition (raw_definition, workitem) expression = raw_definition.instantiate(workitem) end def remove_environment (environment_id) env, fei = fetch(environment_id) env.unbind() get_expression_storage().remove(environment_id, nil) end def build_workitem (launchitem) wi = InFlowWorkItem.new() wi.attributes = launchitem.attributes.dup() return wi end def fetch_definition (launchitem) wfdUrl = launchitem.workflow_definition_url #ldebug { "wfdUrl is '#{wfdUrl}'" } sDefinition = nil wfdField = nil if wfdUrl[0..5] == 'field:' wfdField = wfdUrl[6..-1] sDefinition = launchitem.attributes[wfdField] else sDefinition = NET::HTTP.get(URI.parse(wfdUrl)) end #ldebug { "sDefinition is \n#{sDefinition}" } launchitem.attributes.delete(wfdField) if wfdField if sDefinition.kind_of? String xmlRoot = REXML::Document.new(sDefinition).root class << xmlRoot def rawExpressionClass XmlRawExpression end end return xmlRoot end if sDefinition.kind_of? ProgExpRepresentation return sDefinition end if sDefinition.kind_of? ProcessDefinition return sDefinition.make() end if sDefinition.kind_of? Class return sDefinition.do_make(get_expression_map) end raise \ "Cannot deduce process definition " + "out of instance of class #{sDefinition.class}" end def new_fei (flow_url, flow_name, flow_revision, exp_name) fei = FlowExpressionId.new fei.owfe_version = OPENWFE_VERSION fei.engine_id = get_engine.service_name fei.initial_engine_id = fei.engine_id fei.workflow_definition_url = flow_url fei.workflow_definition_name = flow_name fei.workflow_definition_revision = flow_revision fei.workflow_instance_id = new_workflow_instance_id() fei.expression_id = "0" fei.expression_name = exp_name return fei end def new_workflow_instance_id () synchronize do wfid = OpenWFE::current_time_millis() wfid = wfid + 1 if wfid == @@last_given_instance_id @@last_given_instance_id = wfid return wfid.to_s end end # # Builds the RawExpression instance at the root of the flow # being launched. # def buildRawExpression (launchitem) procdef = fetch_definition(launchitem) flow_url = launchitem.workflow_definition_url flow_name = procdef.attributes['name'] flow_revision = procdef.attributes['revision'] exp_name = procdef.name fei = new_fei(flow_url, flow_name, flow_revision, exp_name) #puts procdef.rawExpressionClass #puts procdef.rawExpressionClass.public_methods return procdef.rawExpressionClass\ .new(fei, nil, nil, @application_context, procdef) end end end