lib/openwfe/expool/expressionpool.rb in openwferu-0.9.6 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.7

- old
+ new

@@ -52,10 +52,11 @@ require 'openwfe/util/lru' require 'openwfe/util/safe' require 'openwfe/util/observable' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' +require 'openwfe/expressions/raw_prog' include OpenWFE module OpenWFE @@ -104,10 +105,11 @@ # This special flow expression id is used by the forget() method # (which is used by the forget expression and the concurrence # synchronization expressions) # GONE_PARENT_ID = FlowExpressionId.new + GONE_PARENT_ID.owfe_version = "any" GONE_PARENT_ID.engine_id = GONE GONE_PARENT_ID.initial_engine_id = GONE GONE_PARENT_ID.workflow_definition_url = GONE GONE_PARENT_ID.workflow_definition_name = GONE GONE_PARENT_ID.workflow_definition_revision = GONE @@ -132,14 +134,10 @@ SAFETY_LEVEL = 2 # # code loaded from a remote URI will get evaluated with # that security level - @@last_given_instance_id = -1 - # - # storing at class level the last workflow instance id given - def initialize (service_name, application_context) super() service_init(service_name, application_context) @@ -167,10 +165,44 @@ def get_monitor (fei) return @monitors[fei] end # + # This method is called by the launch method. It's actually the first + # stage of that method. + # It may be interessant to use to 'validate' a launchitem and its + # process definition, as it will raise an exception in case + # of 'parameter' mismatch. + # + # There is a 'pre_launch_check' alias for this method in the + # Engine class. + # + def prepare_raw_expression (launchitem) + + wfdurl = launchitem.workflow_definition_url + + definition = if wfdurl.match "^field:" + wfdfield = wfdurl[6..-1] + launchitem.attributes.delete(wfdfield) + else + read_uri(wfdurl) + end + + raise "didn't find process definition at '#{wfdurl}'" \ + unless definition + + raw_expression = build_raw_expression(wfdurl, definition) + + raw_expression.check_parameters(launchitem) + # + # will raise an exception if there are requirements + # and one of them is not met + + raw_expression + end + + # # Instantiates a workflow definition and launches it. # # If async is set to true, the launch will occur in its own thread. # # Returns the FlowExpressionId instance of the root expression of @@ -178,27 +210,27 @@ # If async is set to true, returns a tuple FlowExpressionId / # Thread instance used. # def launch (launchitem, async=false) - onotify :launch, launchitem.workflow_definition_url + raw_expression = prepare_raw_expression(launchitem) + # + # will raise an exception if there are requirements + # and one of them is not met - procdef = fetch_definition(launchitem) - - raw_expression = build_raw_expression( - launchitem.workflow_definition_url, procdef) - raw_expression.new_environment() # # as this expression is the root of a new process instance, # it has to have an environment for all the variables of # the process instance wi = build_workitem(launchitem) fei = raw_expression.fei + onotify :launch, launchitem.workflow_definition_url + if async t = OpenWFE::call_in_thread "launch()", self do apply(raw_expression, wi) end @@ -207,46 +239,41 @@ end apply(raw_expression, wi) return fei + #return fei, Thread.current end # # launches a subprocess # def launch_template ( requesting_expression, sub_id, template, workitem, params=nil) - #ldebug { "launch_template() of class #{template.class}" } - - rawexp = nil - - if template.kind_of? FlowExpressionId - - rawexp, fei = fetch(template) - - elsif template.kind_of? URI - - rawexp = load_rawexp_from_uri(template) - - else # template is of kind RawExpression - - rawexp = template + rawexp = if template.is_a? RawExpression + template + elsif template.is_a? FlowExpressionId + fetch_expression(template) + else + build_raw_expression("no-url", template) end + #raise "did not find expression at #{template.to_s}" \ + # unless rawexp + ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" } onotify :launch_template, rawexp.fei rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression == nil rawexp.parent_id = nil - rawexp.fei.workflow_instance_id = new_workflow_instance_id() + rawexp.fei.workflow_instance_id = get_wfid_generator.generate elsif requesting_expression.kind_of? FlowExpressionId rawexp.parent_id = requesting_expression rawexp.fei.workflow_instance_id = \ @@ -254,11 +281,11 @@ elsif requesting_expression.kind_of? String rawexp.parent_id = nil rawexp.fei.workflow_instance_id = \ - "#{requesting_expression}.${sub_id}" + "#{requesting_expression}.#{sub_id}" else # kind is FlowExpression rawexp.parent_id = requesting_expression.fei rawexp.fei.workflow_instance_id = \ @@ -338,17 +365,19 @@ exp.apply(workitem) end # - # Cancels the given expression + # Cancels the given expression. + # The param might be an expression instance or a FlowExpressionId + # instance. # def cancel (exp) exp, fei = fetch(exp) - if not exp + unless exp ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" } return nil end ldebug { "cancel() for #{fei.to_debug_s}" } @@ -360,20 +389,20 @@ return inflowitem end # - # Given any expression of a process, cancels the complete process. + # Given any expression of a process, cancels the complete process + # instance. # - def cancel_flow (exp) + def cancel_flow (exp_or_wfid) - ldebug { "cancel_flow() from #{exp}" } + ldebug { "cancel_flow() from #{exp_or_wfid}" } - root = fetch_root(exp) + root = fetch_root(exp_or_wfid) cancel(root) end - alias :cancel_process :cancel_flow # # Forgets the given expression (makes sure to substitute its # parent_id with the GONE_PARENT_ID constant) @@ -411,31 +440,37 @@ if not exp.parent_id ldebug do "reply_to_parent() process " + "#{exp.fei.workflow_instance_id} terminated" end - else - if exp.parent_id == GONE_PARENT_ID - ldebug do - "reply_to_parent() parent is gone for " + - exp.fei.to_debug_s - end - else - reply(exp.parent_id, workitem) + + onotify :terminate, exp.fei + + return + end + + if exp.parent_id == GONE_PARENT_ID + ldebug do + "reply_to_parent() parent is gone for " + + exp.fei.to_debug_s end + + return end + + reply(exp.parent_id, workitem) end # # Triggers the reply expression of the expression given by its id. # def reply (exp, workitem) exp, fei = fetch(exp) ldebug { "reply() to #{fei.to_debug_s}" } - ldebug { "reply() from #{workitem.last_expression_id}" } + ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" } if not exp #raise "cannot reply to missing #{fei.to_debug_s}" lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } return @@ -447,21 +482,21 @@ end # # Adds or updates a flow expression in this pool # - def update (flowExpression) + def update (flow_expression) + #ldebug { "update() for #{flow_expression.fei.to_debug_s}" } + t = Timer.new - onotify :update, flowExpression.fei, flowExpression + onotify :update, flow_expression.fei, flow_expression - #get_expression_storage()[flowExpression.fei] = flowExpression - ldebug { "update() took #{t.duration} ms" } - return flowExpression + flow_expression end # # Fetches a FlowExpression from the pool. # Returns a tuple : the FlowExpression plus its FlowExpressionId. @@ -501,14 +536,19 @@ return exp end # # Fetches the root expression of a process (given any of its - # expressions). + # expressions or its wfid). # - def fetch_root (exp) - exp = fetch_expression(exp) + def fetch_root (exp_or_wfid) + + return fetch_expression_with_wfid(exp_or_wfid) \ + if exp_or_wfid.is_a? String + + exp = fetch_expression(exp_or_wfid) + return exp unless exp.parent_id return fetch_root(fetch_expression(exp.parent_id)) end # @@ -631,31 +671,59 @@ ldebug do "get_flow_position() " + "found #{result.size} exps for flow #{wfid}" end - return result + result end + # + # Lists all workflows (processes) currently in the expool (in + # the engine). + # This method will return a list of "process-definition" expressions + # (root of flows). + # + # If consider_subprocesses is set to true, "process-definition" + # expressions of subprocesses will be returned as well. + # + # "wfid_prefix" allows your to query for specific workflow instance + # id prefixes. + # + def list_workflows (consider_subprocesses=false, wfid_prefix=nil) + + result = [] + + get_expression_storage.real_each do |fei, fexp| + + next unless fexp.is_a? DefineExpression + + next if not consider_subprocesses and fei.wfid.index(".") + + next unless fei.wfid.match("^#{wfid_prefix}") if wfid_prefix + + result << fexp + end + + result + end + alias :list_processes :list_workflows + + # + # Returns the first expression found with the given wfid. + # + def fetch_expression_with_wfid (wfid) + + list_workflows(false, wfid)[0] + end + protected #def evaluate_definition (raw_definition, workitem) # expression = raw_definition.instantiate(workitem) #end # - # Prepares the RawExpression for a 'remote' process definition - # (a process definition pointed at via a URI). - # - def load_rawexp_from_uri (uri) - - procdef = determine_representation(uri.read) - - build_raw_expression(uri.to_s, procdef) - end - - # # Removes an environment, especially takes care of unbinding # any special value it may contain. # def remove_environment (environment_id) @@ -682,80 +750,81 @@ return wi end # - # Given a string that contains either an XML process - # definition, either a Ruby process definition - # will return a 'representation' (what is used to build - # a RawExpression instance). + # This is the only point in the expression pool where an URI + # is read, so this is where the :remote_definitions_allowed + # security check is enforced. # - def determine_representation (string_definition) + def read_uri (uri) - if string_definition[0, 1] == "<" + uri = uri.to_s + uri = uri[5..-1] if uri.match("^file:") + uri = URI.parse(uri) - xmlRoot = REXML::Document.new(string_definition).root - class << xmlRoot - def raw_expression_class - XmlRawExpression - end - end - return xmlRoot + if uri.scheme + raise "loading remote definitions is not allowed" \ + if ac[:remote_definitions_allowed] != true end - # else - - #o = eval(string_definition) - o = OpenWFE::eval_safely(string_definition, SAFETY_LEVEL) - #o = OpenWFE::load_eval_safely(string_definition, SAFETY_LEVEL) - #o = OpenWFE::eval_r_safely(string_definition, SAFETY_LEVEL) - - return o.do_make if o.kind_of? Class - - return o + open(uri.to_s).read end # - # Extracts the process definition that a launchitem points at. + # The parameter to this method might be either a process + # definition (in any form) or a LaunchItem. # - def fetch_definition (launchitem) + # Will return a 'representation' (what is used to build + # a RawExpression instance). + # + def determine_representation (param) - wfdUrl = launchitem.workflow_definition_url + #ldebug do + # "determine_representation() from class #{param.class.name}" + #end - #ldebug { "wfdUrl is '#{wfdUrl}'" } + param = read_uri(param) if param.is_a? URI - sDefinition = nil - wfdField = nil + #ldebug do + # "determine_representation() " + + # "param of class #{param.class.name}" + #end - if wfdUrl[0..5] == 'field:' - wfdField = wfdUrl[6..-1] - sDefinition = launchitem.attributes[wfdField] - else - sDefinition = URI.parse(wfdUrl).read - end + return param if param.is_a? ProgExpRepresentation + return param.make if param.is_a? ProcessDefinition + return param.do_make if param.is_a? Class - #ldebug { "sDefinition is \n#{sDefinition}" } + raise "cannot handle definition of class #{param.class.name}" \ + unless param.is_a? String - launchitem.attributes.delete(wfdField) if wfdField + if param[0, 1] == "<" + # + # XML definition - return determine_representation(sDefinition) \ - if sDefinition.kind_of? String + xmlRoot = REXML::Document.new(param).root + class << xmlRoot + def raw_expression_class + XmlRawExpression + end + end + return xmlRoot + end - return sDefinition \ - if sDefinition.kind_of? ProgExpRepresentation + return YAML.load(s) if param.match("^--- .") + # + # something that was dumped via YAML - return sDefinition.make \ - if sDefinition.kind_of? ProcessDefinition + # + # else it's some ruby code to eval - return sDefinition.do_make \ - if sDefinition.kind_of? Class + o = OpenWFE::eval_safely(param, SAFETY_LEVEL) - # else... + return o.make if o.is_a? ProcessDefinition + return o.do_make if o.kind_of? Class - raise \ - "Cannot deduce process definition " + - "out of instance of class #{sDefinition.class}" + o end # # Builds a FlowExpressionId instance for process being # launched. @@ -763,43 +832,32 @@ def new_fei (flow_url, flow_name, flow_revision, exp_name) fei = FlowExpressionId.new fei.owfe_version = OPENWFERU_VERSION - fei.engine_id = get_engine.service_name - fei.initial_engine_id = fei.engine_id - fei.workflow_definition_url = flow_url - fei.workflow_definition_name = flow_name - fei.workflow_definition_revision = flow_revision - fei.workflow_instance_id = new_workflow_instance_id() + fei.engine_id = OpenWFE::stu get_engine.service_name + fei.initial_engine_id = OpenWFE::stu fei.engine_id + fei.workflow_definition_url = OpenWFE::stu flow_url + fei.workflow_definition_name = OpenWFE::stu flow_name + fei.workflow_definition_revision = OpenWFE::stu flow_revision + fei.workflow_instance_id = get_wfid_generator.generate fei.expression_id = "0" fei.expression_name = exp_name return fei end # - # Returns a new unique workflow instance id - # - def new_workflow_instance_id () - - synchronize do - - wfid = OpenWFE::current_time_millis() - - wfid = @@last_given_instance_id + 1 \ - if wfid <= @@last_given_instance_id - - @@last_given_instance_id = wfid - - return wfid.to_s - end - end - - # # Builds the RawExpression instance at the root of the flow # being launched. # - def build_raw_expression (url, procdef) + # The param can be a LaunchItem, an URI, anything accepted + # by the determine_representation() method. + # + def build_raw_expression (url, param) + + procdef = determine_representation(param) + + #return procdef if procdef.is_a? RawExpression flow_name = procdef.attributes['name'] flow_revision = procdef.attributes['revision'] exp_name = procdef.name