lib/openwfe/expool/expressionpool.rb in openwferu-0.9.5 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.6
- old
+ new
@@ -39,19 +39,20 @@
# John Mettraux at openwfe.org
#
require 'uri'
require 'monitor'
-require 'net/http'
+require 'open-uri'
require 'rexml/document'
require 'openwfe/utils'
require 'openwfe/service'
require 'openwfe/logging'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
require 'openwfe/util/lru'
+require 'openwfe/util/safe'
require 'openwfe/util/observable'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw_xml'
include OpenWFE
@@ -126,10 +127,15 @@
ServiceMixin,
MonitorMixin,
OwfeServiceLocator,
Observable
+ SAFETY_LEVEL = 2
+ #
+ # code loaded from a remote URI will get evaluated with
+ # that security level
+
@@last_given_instance_id = -1
#
# storing at class level the last workflow instance id given
def initialize (service_name, application_context)
@@ -174,26 +180,35 @@
#
def launch (launchitem, async=false)
onotify :launch, launchitem.workflow_definition_url
- raw_expression = buildRawExpression(launchitem)
+ procdef = fetch_definition(launchitem)
+ raw_expression = build_raw_expression(
+ launchitem.workflow_definition_url, procdef)
+
+ raw_expression.new_environment()
+ #
+ # as this expression is the root of a new process instance,
+ # it has to have an environment for all the variables of
+ # the process instance
+
wi = build_workitem(launchitem)
fei = raw_expression.fei
if async
t = OpenWFE::call_in_thread "launch()", self do
- raw_expression.apply(wi)
+ apply(raw_expression, wi)
end
return fei, t
end
- raw_expression.apply(wi)
+ apply(raw_expression, wi)
return fei
end
#
@@ -205,12 +220,19 @@
#ldebug { "launch_template() of class #{template.class}" }
rawexp = nil
if template.kind_of? FlowExpressionId
+
rawexp, fei = fetch(template)
+
+ elsif template.kind_of? URI
+
+ rawexp = load_rawexp_from_uri(template)
+
else # template is of kind RawExpression
+
rawexp = template
end
ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" }
@@ -255,22 +277,22 @@
# "launch_template() spawning wfid " +
# "#{rawexp.fei.workflow_instance_id.to_s}"
#end
env = rawexp.new_environment()
-
+ #
params.each { |k, v| env[k] = v } if params
#
# the new scope gets its own environment
rawexp.store_itself()
workitem.flow_expression_id = rawexp.fei
fei = rawexp.fei
- rawexp.apply(workitem)
+ apply(rawexp, workitem)
return fei
end
#
@@ -293,25 +315,29 @@
#
# Applies a given expression (id or expression)
#
def apply (exp, workitem)
- exp, fei = fetch(exp)
+ exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId
#ldebug { "apply() '#{fei}' (#{fei.class})" }
if not exp
lwarn { "apply() cannot apply missing #{fei.to_debug_s}" }
return
end
#ldebug { "apply() #{fei.to_debug_s}" }
- onotify :apply, fei, workitem
+ #exp.apply_time = OpenWFE::now()
+ #
+ # this is done in RawExpression
workitem.flow_expression_id = exp.fei
+ onotify :apply, exp.fei, workitem
+
exp.apply(workitem)
end
#
# Cancels the given expression
@@ -508,33 +534,25 @@
# Removes a flow expression from the pool
# (This method is mainly called from the pool itself)
#
def remove (exp)
- #exp, fei = fetch(exp)
+ exp, _fei = fetch(exp) \
+ if exp.kind_of? FlowExpressionId
- if exp.kind_of? FlowExpressionId
- exp, _fei = fetch(exp)
- end
-
return if not exp
- ldebug { "remove() fe #{exp.fei.to_debug_s}" }
+ ldebug { "remove() fe #{exp.fei.to_debug_s}" }
onotify :remove, exp.fei
synchronize do
@monitors.delete(exp.fei)
- #get_expression_storage().delete(fei)
-
- if exp.owns_its_environment?
-
- remove_environment(exp.environment_id)
- end
-
+ remove_environment(exp.environment_id) \
+ if exp.owns_its_environment?
end
end
#
# This method is called at each expool (engine) [re]start.
@@ -584,16 +602,63 @@
@eei.expression_id = '0'
return @eei
end
end
+ #
+ # Returns the list of applied expressions belonging to a given
+ # workflow instance.
+ #
+ def get_flow_position (wfid)
+
+ raise "please provide a non-nil workflow instance id" \
+ unless wfid
+
+ result = []
+
+ get_expression_storage.real_each do |fei, fexp|
+
+ next if fexp.kind_of? Environment
+ next if fexp.kind_of? RawExpression
+ next unless fexp.apply_time
+
+ pi = fei.parent_wfid
+
+ next if pi != wfid
+
+ result << fexp
+ end
+
+ ldebug do
+ "get_flow_position() " +
+ "found #{result.size} exps for flow #{wfid}"
+ end
+
+ return result
+ end
+
protected
- def evaluate_definition (raw_definition, workitem)
- expression = raw_definition.instantiate(workitem)
+ #def evaluate_definition (raw_definition, workitem)
+ # expression = raw_definition.instantiate(workitem)
+ #end
+
+ #
+ # Prepares the RawExpression for a 'remote' process definition
+ # (a process definition pointed at via a URI).
+ #
+ def load_rawexp_from_uri (uri)
+
+ procdef = determine_representation(uri.read)
+
+ build_raw_expression(uri.to_s, procdef)
end
+ #
+ # Removes an environment, especially takes care of unbinding
+ # any special value it may contain.
+ #
def remove_environment (environment_id)
ldebug { "remove_environment() #{environment_id.to_debug_s}" }
env, fei = fetch(environment_id)
@@ -603,19 +668,57 @@
#get_expression_storage().delete(environment_id)
onotify :remove, environment_id
end
+ #
+ # Prepares a new instance of InFlowWorkItem from a LaunchItem
+ # instance.
+ #
def build_workitem (launchitem)
wi = InFlowWorkItem.new()
wi.attributes = launchitem.attributes.dup()
return wi
end
+ #
+ # Given a string that contains either an XML process
+ # definition, either a Ruby process definition
+ # will return a 'representation' (what is used to build
+ # a RawExpression instance).
+ #
+ def determine_representation (string_definition)
+
+ if string_definition[0, 1] == "<"
+
+ xmlRoot = REXML::Document.new(string_definition).root
+ class << xmlRoot
+ def raw_expression_class
+ XmlRawExpression
+ end
+ end
+ return xmlRoot
+ end
+
+ # else
+
+ #o = eval(string_definition)
+ o = OpenWFE::eval_safely(string_definition, SAFETY_LEVEL)
+ #o = OpenWFE::load_eval_safely(string_definition, SAFETY_LEVEL)
+ #o = OpenWFE::eval_r_safely(string_definition, SAFETY_LEVEL)
+
+ return o.do_make if o.kind_of? Class
+
+ return o
+ end
+
+ #
+ # Extracts the process definition that a launchitem points at.
+ #
def fetch_definition (launchitem)
wfdUrl = launchitem.workflow_definition_url
#ldebug { "wfdUrl is '#{wfdUrl}'" }
@@ -625,48 +728,40 @@
if wfdUrl[0..5] == 'field:'
wfdField = wfdUrl[6..-1]
sDefinition = launchitem.attributes[wfdField]
else
- sDefinition = NET::HTTP.get(URI.parse(wfdUrl))
+ sDefinition = URI.parse(wfdUrl).read
end
#ldebug { "sDefinition is \n#{sDefinition}" }
launchitem.attributes.delete(wfdField) if wfdField
- if sDefinition.kind_of? String
+ return determine_representation(sDefinition) \
+ if sDefinition.kind_of? String
- xmlRoot = REXML::Document.new(sDefinition).root
- class << xmlRoot
- def rawExpressionClass
- XmlRawExpression
- end
- end
- return xmlRoot
- end
+ return sDefinition \
+ if sDefinition.kind_of? ProgExpRepresentation
- if sDefinition.kind_of? ProgExpRepresentation
+ return sDefinition.make \
+ if sDefinition.kind_of? ProcessDefinition
- return sDefinition
- end
+ return sDefinition.do_make \
+ if sDefinition.kind_of? Class
- if sDefinition.kind_of? ProcessDefinition
+ # else...
- return sDefinition.make()
- end
-
- if sDefinition.kind_of? Class
-
- return sDefinition.do_make()
- end
-
raise \
"Cannot deduce process definition " +
"out of instance of class #{sDefinition.class}"
end
+ #
+ # Builds a FlowExpressionId instance for process being
+ # launched.
+ #
def new_fei (flow_url, flow_name, flow_revision, exp_name)
fei = FlowExpressionId.new
fei.owfe_version = OPENWFERU_VERSION
@@ -679,10 +774,13 @@
fei.expression_id = "0"
fei.expression_name = exp_name
return fei
end
+ #
+ # Returns a new unique workflow instance id
+ #
def new_workflow_instance_id ()
synchronize do
wfid = OpenWFE::current_time_millis()
@@ -698,24 +796,21 @@
#
# Builds the RawExpression instance at the root of the flow
# being launched.
#
- def buildRawExpression (launchitem)
+ def build_raw_expression (url, procdef)
- procdef = fetch_definition(launchitem)
-
- flow_url = launchitem.workflow_definition_url
flow_name = procdef.attributes['name']
flow_revision = procdef.attributes['revision']
exp_name = procdef.name
- fei = new_fei(flow_url, flow_name, flow_revision, exp_name)
+ fei = new_fei(url, flow_name, flow_revision, exp_name)
- #puts procdef.rawExpressionClass
- #puts procdef.rawExpressionClass.public_methods
+ #puts procdef.raw_expression_class
+ #puts procdef.raw_expression_class.public_methods
- return procdef.rawExpressionClass\
+ return procdef.raw_expression_class\
.new(fei, nil, nil, @application_context, procdef)
end
end