lib/openwfe/expool/expressionpool.rb in openwferu-0.9.5 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.6

- old
+ new

@@ -39,19 +39,20 @@ # John Mettraux at openwfe.org # require 'uri' require 'monitor' -require 'net/http' +require 'open-uri' require 'rexml/document' require 'openwfe/utils' require 'openwfe/service' require 'openwfe/logging' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/lru' +require 'openwfe/util/safe' require 'openwfe/util/observable' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' include OpenWFE @@ -126,10 +127,15 @@ ServiceMixin, MonitorMixin, OwfeServiceLocator, Observable + SAFETY_LEVEL = 2 + # + # code loaded from a remote URI will get evaluated with + # that security level + @@last_given_instance_id = -1 # # storing at class level the last workflow instance id given def initialize (service_name, application_context) @@ -174,26 +180,35 @@ # def launch (launchitem, async=false) onotify :launch, launchitem.workflow_definition_url - raw_expression = buildRawExpression(launchitem) + procdef = fetch_definition(launchitem) + raw_expression = build_raw_expression( + launchitem.workflow_definition_url, procdef) + + raw_expression.new_environment() + # + # as this expression is the root of a new process instance, + # it has to have an environment for all the variables of + # the process instance + wi = build_workitem(launchitem) fei = raw_expression.fei if async t = OpenWFE::call_in_thread "launch()", self do - raw_expression.apply(wi) + apply(raw_expression, wi) end return fei, t end - raw_expression.apply(wi) + apply(raw_expression, wi) return fei end # @@ -205,12 +220,19 @@ #ldebug { "launch_template() of class #{template.class}" } rawexp = nil if template.kind_of? FlowExpressionId + rawexp, fei = fetch(template) + + elsif template.kind_of? URI + + rawexp = load_rawexp_from_uri(template) + else # template is of kind RawExpression + rawexp = template end ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" } @@ -255,22 +277,22 @@ # "launch_template() spawning wfid " + # "#{rawexp.fei.workflow_instance_id.to_s}" #end env = rawexp.new_environment() - + # params.each { |k, v| env[k] = v } if params # # the new scope gets its own environment rawexp.store_itself() workitem.flow_expression_id = rawexp.fei fei = rawexp.fei - rawexp.apply(workitem) + apply(rawexp, workitem) return fei end # @@ -293,25 +315,29 @@ # # Applies a given expression (id or expression) # def apply (exp, workitem) - exp, fei = fetch(exp) + exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId #ldebug { "apply() '#{fei}' (#{fei.class})" } if not exp lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } return end #ldebug { "apply() #{fei.to_debug_s}" } - onotify :apply, fei, workitem + #exp.apply_time = OpenWFE::now() + # + # this is done in RawExpression workitem.flow_expression_id = exp.fei + onotify :apply, exp.fei, workitem + exp.apply(workitem) end # # Cancels the given expression @@ -508,33 +534,25 @@ # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # def remove (exp) - #exp, fei = fetch(exp) + exp, _fei = fetch(exp) \ + if exp.kind_of? FlowExpressionId - if exp.kind_of? FlowExpressionId - exp, _fei = fetch(exp) - end - return if not exp - ldebug { "remove() fe #{exp.fei.to_debug_s}" } + ldebug { "remove() fe #{exp.fei.to_debug_s}" } onotify :remove, exp.fei synchronize do @monitors.delete(exp.fei) - #get_expression_storage().delete(fei) - - if exp.owns_its_environment? - - remove_environment(exp.environment_id) - end - + remove_environment(exp.environment_id) \ + if exp.owns_its_environment? end end # # This method is called at each expool (engine) [re]start. @@ -584,16 +602,63 @@ @eei.expression_id = '0' return @eei end end + # + # Returns the list of applied expressions belonging to a given + # workflow instance. + # + def get_flow_position (wfid) + + raise "please provide a non-nil workflow instance id" \ + unless wfid + + result = [] + + get_expression_storage.real_each do |fei, fexp| + + next if fexp.kind_of? Environment + next if fexp.kind_of? RawExpression + next unless fexp.apply_time + + pi = fei.parent_wfid + + next if pi != wfid + + result << fexp + end + + ldebug do + "get_flow_position() " + + "found #{result.size} exps for flow #{wfid}" + end + + return result + end + protected - def evaluate_definition (raw_definition, workitem) - expression = raw_definition.instantiate(workitem) + #def evaluate_definition (raw_definition, workitem) + # expression = raw_definition.instantiate(workitem) + #end + + # + # Prepares the RawExpression for a 'remote' process definition + # (a process definition pointed at via a URI). + # + def load_rawexp_from_uri (uri) + + procdef = determine_representation(uri.read) + + build_raw_expression(uri.to_s, procdef) end + # + # Removes an environment, especially takes care of unbinding + # any special value it may contain. + # def remove_environment (environment_id) ldebug { "remove_environment() #{environment_id.to_debug_s}" } env, fei = fetch(environment_id) @@ -603,19 +668,57 @@ #get_expression_storage().delete(environment_id) onotify :remove, environment_id end + # + # Prepares a new instance of InFlowWorkItem from a LaunchItem + # instance. + # def build_workitem (launchitem) wi = InFlowWorkItem.new() wi.attributes = launchitem.attributes.dup() return wi end + # + # Given a string that contains either an XML process + # definition, either a Ruby process definition + # will return a 'representation' (what is used to build + # a RawExpression instance). + # + def determine_representation (string_definition) + + if string_definition[0, 1] == "<" + + xmlRoot = REXML::Document.new(string_definition).root + class << xmlRoot + def raw_expression_class + XmlRawExpression + end + end + return xmlRoot + end + + # else + + #o = eval(string_definition) + o = OpenWFE::eval_safely(string_definition, SAFETY_LEVEL) + #o = OpenWFE::load_eval_safely(string_definition, SAFETY_LEVEL) + #o = OpenWFE::eval_r_safely(string_definition, SAFETY_LEVEL) + + return o.do_make if o.kind_of? Class + + return o + end + + # + # Extracts the process definition that a launchitem points at. + # def fetch_definition (launchitem) wfdUrl = launchitem.workflow_definition_url #ldebug { "wfdUrl is '#{wfdUrl}'" } @@ -625,48 +728,40 @@ if wfdUrl[0..5] == 'field:' wfdField = wfdUrl[6..-1] sDefinition = launchitem.attributes[wfdField] else - sDefinition = NET::HTTP.get(URI.parse(wfdUrl)) + sDefinition = URI.parse(wfdUrl).read end #ldebug { "sDefinition is \n#{sDefinition}" } launchitem.attributes.delete(wfdField) if wfdField - if sDefinition.kind_of? String + return determine_representation(sDefinition) \ + if sDefinition.kind_of? String - xmlRoot = REXML::Document.new(sDefinition).root - class << xmlRoot - def rawExpressionClass - XmlRawExpression - end - end - return xmlRoot - end + return sDefinition \ + if sDefinition.kind_of? ProgExpRepresentation - if sDefinition.kind_of? ProgExpRepresentation + return sDefinition.make \ + if sDefinition.kind_of? ProcessDefinition - return sDefinition - end + return sDefinition.do_make \ + if sDefinition.kind_of? Class - if sDefinition.kind_of? ProcessDefinition + # else... - return sDefinition.make() - end - - if sDefinition.kind_of? Class - - return sDefinition.do_make() - end - raise \ "Cannot deduce process definition " + "out of instance of class #{sDefinition.class}" end + # + # Builds a FlowExpressionId instance for process being + # launched. + # def new_fei (flow_url, flow_name, flow_revision, exp_name) fei = FlowExpressionId.new fei.owfe_version = OPENWFERU_VERSION @@ -679,10 +774,13 @@ fei.expression_id = "0" fei.expression_name = exp_name return fei end + # + # Returns a new unique workflow instance id + # def new_workflow_instance_id () synchronize do wfid = OpenWFE::current_time_millis() @@ -698,24 +796,21 @@ # # Builds the RawExpression instance at the root of the flow # being launched. # - def buildRawExpression (launchitem) + def build_raw_expression (url, procdef) - procdef = fetch_definition(launchitem) - - flow_url = launchitem.workflow_definition_url flow_name = procdef.attributes['name'] flow_revision = procdef.attributes['revision'] exp_name = procdef.name - fei = new_fei(flow_url, flow_name, flow_revision, exp_name) + fei = new_fei(url, flow_name, flow_revision, exp_name) - #puts procdef.rawExpressionClass - #puts procdef.rawExpressionClass.public_methods + #puts procdef.raw_expression_class + #puts procdef.raw_expression_class.public_methods - return procdef.rawExpressionClass\ + return procdef.raw_expression_class\ .new(fei, nil, nil, @application_context, procdef) end end