# # Copyright (c) 2006, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # $Id: definitions.rb 2725 2006-06-02 13:26:32Z jmettraux $ # # # "made in Japan" # # John Mettraux at openwfe.org # require 'uri' require 'monitor' require 'net/http' require 'flowexpressionid' require 'ru/service' require 'ru/environment' require 'ru/fe_raw' require 'ru/rudefinitions' module OpenWFEru class ExpressionPool < Service include MonitorMixin @@last_given_instance_id = -1 # # storing at class level the last workflow instance id given #def initialize (serviceName, applicationContext) # super(serviceName, applicationContext) #end # # Instantiates a workflow definition and launches it. # def launch (launchitem) wi = build_workitem(launchitem) xmldef = fetch_definition(launchitem) fei = new_fei(launchitem.workflowDefinitionUrl, xmldef) rawExpression = RawExpression\ .new(fei, nil, nil, @application_context, xmldef) rawExpression.apply(wi) end # # launches a subprocess # def launch_template \ (requesting_expression, template_fei, workitem, params) ldebug { "launch() request for #{template_fei.to_debug_s}" } rawexp = fetch(template_fei) rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression.kind_of? OpenWFE::FlowExpressionId rawexp.parent_id = requesting_expression rawexp.fei.workflowInstanceId = \ "#{requesting_expression.workflowInstanceId}.0" elsif requesting_expression.kind_of? String rawexp.parent_id = nil rawexp.fei.workflowInstanceId = \ "#{requesting_expression}.0" else # kind is FlowExpression rawexp.parent_id = requesting_expression.fei rawexp.fei.workflowInstanceId = \ "#{requesting_expression.fei.workflowInstanceId}.0" end #ldebug do # "launch_template() spawning wfid " + # "#{rawexp.fei.workflowInstanceId.to_s}" #end env = rawexp.new_environment() params.each { |k, v| env[k] = v } if params # # the new scope gets its own environment rawexp.store_itself() rawexp.apply(workitem) end # # Applies a given expression (id) # def apply (flowExpressionId, workitem) ldebug { "apply() #{flowExpressionId.to_debug_s}" } workitem.lastExpressionId = flowExpressionId exp = fetch(flowExpressionId) exp.apply(workitem) end # # Replies to the parent of the given expression. # def reply_to_parent (flowExpression, workitem) workitem.lastExpressionId = flowExpression.fei remove(flowExpression) if flowExpression.parent_id reply(flowExpression.parent_id, workitem) else ldebug do "reply_to_parent() process #{flowExpression.fei.workflowInstanceId} terminated" #get_expression_storage().to_s end end end # # Triggers the reply expression of the expression given by its id. # def reply (flowExpressionId, workitem) ldebug { "reply() #{flowExpressionId.to_debug_s}" } exp = fetch(flowExpressionId) exp.reply(workitem) end # # Adds or updates a flow expression in this pool # def update (flowExpression) get_expression_storage()[flowExpression.fei] = flowExpression #ldebug do # "update() sz #{get_expression_storage.length} "+ # "#{flowExpression.fei.to_debug_s}" #end #ldebug do # "update() sz #{get_expression_storage.length} "+ # "#{flowExpression.fei.to_s}\n"+ # get_expression_storage().to_s #end #ldebug { "update()\n" + get_expression_storage().to_s } end # # Fetches a flowExpression from the pool # def fetch (flowExpressionId) exp = get_expression_storage()[flowExpressionId] #ldebug { "fetch() did not find #{flowExpressionId.to_debug_s}" } \ # if not exp return exp end def fetch_engine_environment () synchronize do eei = engine_environment_id ee = fetch(eei) if not ee ee = Environment\ .new(eei, nil, nil, @application_context, nil) ee.store_itself() end ldebug { "fetch_engine_environment() stored new ee" } return ee end end # # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # def remove (flowExpression) if flowExpression.kind_of? OpenWFE::FlowExpressionId ldebug { "remove() fei #{flowExpression.to_debug_s}" } flowExpression = fetch(flowExpression) end return if not flowExpression ldebug { "remove() fe #{flowExpression.fei.to_debug_s}" } #ldebug do # "remove() #{flowExpression.fei.to_debug_s}" # #"remove() sz before #{get_expression_storage.length}\n" + # #get_expression_storage().to_s #end get_expression_storage().delete(flowExpression.fei) if flowExpression.owns_its_environment? remove_environment(flowExpression.environment_id) end #ldebug do # "remove() sz after #{get_expression_storage.length}\n" + # get_expression_storage().to_s #end end def engine_environment_id () synchronize do return @eei if @eei @eei = OpenWFE::FlowExpressionId.new @eei.owfeVersion = OPENWFE_VERSION @eei.engineId = lookup(Engine).service_name @eei.initialEngineId = @eei.engineId @eei.workflowDefinitionUrl = 'ee' @eei.workflowDefinitionName = 'ee' @eei.workflowDefinitionRevision = '0' @eei.workflowInstanceId = '0' @eei.expressionName = EN_ENVIRONMENT @eei.expressionId = '0' return @eei end end def get_expression_storage () return @application_context[S_EXPRESSION_STORAGE] end protected def remove_environment (environment_id) env = fetch(environment_id) env.unbind() get_expression_storage().delete(environment_id) end def build_workitem (launchitem) wi = OpenWFE::InFlowWorkItem.new() wi.attributes = launchitem.attributes.dup() return wi end def fetch_definition (launchitem) wfdUrl = launchitem.workflowDefinitionUrl #ldebug { "wfdUrl is '#{wfdUrl}'" } sDefinition = nil if wfdUrl[0..5] == 'field:' sDefinition = launchitem.attributes[wfdUrl[6..-1]] else sDefinition = NET::HTTP.get(URI.parse(wfdUrl)) end #ldebug { "sDefinition is \n#{sDefinition}" } return REXML::Document.new(sDefinition).root end def new_fei (flow_url, xml_element) fei = OpenWFE::FlowExpressionId.new fei.owfeVersion = OPENWFE_VERSION fei.engineId = lookup(Engine).service_name fei.initialEngineId = fei.engineId fei.workflowDefinitionUrl = flow_url fei.workflowDefinitionName = \ xml_element.attributes['name'].to_str fei.workflowDefinitionRevision = \ xml_element.attributes['revision'].to_str fei.workflowInstanceId = new_workflow_instance_id() fei.expressionId = "0" fei.expressionName = xml_element.name return fei end def new_workflow_instance_id () synchronize do wfid = OpenWFE::current_time_millis() wfid = wfid + 1 if wfid == @@last_given_instance_id @@last_given_instance_id = wfid return wfid.to_s end end end end