lib/openwfe/expool/expressionpool.rb in openwferu-0.9.6 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.7
- old
+ new
@@ -52,10 +52,11 @@
require 'openwfe/util/lru'
require 'openwfe/util/safe'
require 'openwfe/util/observable'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw_xml'
+require 'openwfe/expressions/raw_prog'
include OpenWFE
module OpenWFE
@@ -104,10 +105,11 @@
# This special flow expression id is used by the forget() method
# (which is used by the forget expression and the concurrence
# synchronization expressions)
#
GONE_PARENT_ID = FlowExpressionId.new
+ GONE_PARENT_ID.owfe_version = "any"
GONE_PARENT_ID.engine_id = GONE
GONE_PARENT_ID.initial_engine_id = GONE
GONE_PARENT_ID.workflow_definition_url = GONE
GONE_PARENT_ID.workflow_definition_name = GONE
GONE_PARENT_ID.workflow_definition_revision = GONE
@@ -132,14 +134,10 @@
SAFETY_LEVEL = 2
#
# code loaded from a remote URI will get evaluated with
# that security level
- @@last_given_instance_id = -1
- #
- # storing at class level the last workflow instance id given
-
def initialize (service_name, application_context)
super()
service_init(service_name, application_context)
@@ -167,10 +165,44 @@
def get_monitor (fei)
return @monitors[fei]
end
#
+ # This method is called by the launch method. It's actually the first
+ # stage of that method.
+ # It may be interessant to use to 'validate' a launchitem and its
+ # process definition, as it will raise an exception in case
+ # of 'parameter' mismatch.
+ #
+ # There is a 'pre_launch_check' alias for this method in the
+ # Engine class.
+ #
+ def prepare_raw_expression (launchitem)
+
+ wfdurl = launchitem.workflow_definition_url
+
+ definition = if wfdurl.match "^field:"
+ wfdfield = wfdurl[6..-1]
+ launchitem.attributes.delete(wfdfield)
+ else
+ read_uri(wfdurl)
+ end
+
+ raise "didn't find process definition at '#{wfdurl}'" \
+ unless definition
+
+ raw_expression = build_raw_expression(wfdurl, definition)
+
+ raw_expression.check_parameters(launchitem)
+ #
+ # will raise an exception if there are requirements
+ # and one of them is not met
+
+ raw_expression
+ end
+
+ #
# Instantiates a workflow definition and launches it.
#
# If async is set to true, the launch will occur in its own thread.
#
# Returns the FlowExpressionId instance of the root expression of
@@ -178,27 +210,27 @@
# If async is set to true, returns a tuple FlowExpressionId /
# Thread instance used.
#
def launch (launchitem, async=false)
- onotify :launch, launchitem.workflow_definition_url
+ raw_expression = prepare_raw_expression(launchitem)
+ #
+ # will raise an exception if there are requirements
+ # and one of them is not met
- procdef = fetch_definition(launchitem)
-
- raw_expression = build_raw_expression(
- launchitem.workflow_definition_url, procdef)
-
raw_expression.new_environment()
#
# as this expression is the root of a new process instance,
# it has to have an environment for all the variables of
# the process instance
wi = build_workitem(launchitem)
fei = raw_expression.fei
+ onotify :launch, launchitem.workflow_definition_url
+
if async
t = OpenWFE::call_in_thread "launch()", self do
apply(raw_expression, wi)
end
@@ -207,46 +239,41 @@
end
apply(raw_expression, wi)
return fei
+ #return fei, Thread.current
end
#
# launches a subprocess
#
def launch_template (
requesting_expression, sub_id, template, workitem, params=nil)
- #ldebug { "launch_template() of class #{template.class}" }
-
- rawexp = nil
-
- if template.kind_of? FlowExpressionId
-
- rawexp, fei = fetch(template)
-
- elsif template.kind_of? URI
-
- rawexp = load_rawexp_from_uri(template)
-
- else # template is of kind RawExpression
-
- rawexp = template
+ rawexp = if template.is_a? RawExpression
+ template
+ elsif template.is_a? FlowExpressionId
+ fetch_expression(template)
+ else
+ build_raw_expression("no-url", template)
end
+ #raise "did not find expression at #{template.to_s}" \
+ # unless rawexp
+
ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" }
onotify :launch_template, rawexp.fei
rawexp = rawexp.dup()
rawexp.fei = rawexp.fei.dup()
if requesting_expression == nil
rawexp.parent_id = nil
- rawexp.fei.workflow_instance_id = new_workflow_instance_id()
+ rawexp.fei.workflow_instance_id = get_wfid_generator.generate
elsif requesting_expression.kind_of? FlowExpressionId
rawexp.parent_id = requesting_expression
rawexp.fei.workflow_instance_id = \
@@ -254,11 +281,11 @@
elsif requesting_expression.kind_of? String
rawexp.parent_id = nil
rawexp.fei.workflow_instance_id = \
- "#{requesting_expression}.${sub_id}"
+ "#{requesting_expression}.#{sub_id}"
else # kind is FlowExpression
rawexp.parent_id = requesting_expression.fei
rawexp.fei.workflow_instance_id = \
@@ -338,17 +365,19 @@
exp.apply(workitem)
end
#
- # Cancels the given expression
+ # Cancels the given expression.
+ # The param might be an expression instance or a FlowExpressionId
+ # instance.
#
def cancel (exp)
exp, fei = fetch(exp)
- if not exp
+ unless exp
ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" }
return nil
end
ldebug { "cancel() for #{fei.to_debug_s}" }
@@ -360,20 +389,20 @@
return inflowitem
end
#
- # Given any expression of a process, cancels the complete process.
+ # Given any expression of a process, cancels the complete process
+ # instance.
#
- def cancel_flow (exp)
+ def cancel_flow (exp_or_wfid)
- ldebug { "cancel_flow() from #{exp}" }
+ ldebug { "cancel_flow() from #{exp_or_wfid}" }
- root = fetch_root(exp)
+ root = fetch_root(exp_or_wfid)
cancel(root)
end
-
alias :cancel_process :cancel_flow
#
# Forgets the given expression (makes sure to substitute its
# parent_id with the GONE_PARENT_ID constant)
@@ -411,31 +440,37 @@
if not exp.parent_id
ldebug do
"reply_to_parent() process " +
"#{exp.fei.workflow_instance_id} terminated"
end
- else
- if exp.parent_id == GONE_PARENT_ID
- ldebug do
- "reply_to_parent() parent is gone for " +
- exp.fei.to_debug_s
- end
- else
- reply(exp.parent_id, workitem)
+
+ onotify :terminate, exp.fei
+
+ return
+ end
+
+ if exp.parent_id == GONE_PARENT_ID
+ ldebug do
+ "reply_to_parent() parent is gone for " +
+ exp.fei.to_debug_s
end
+
+ return
end
+
+ reply(exp.parent_id, workitem)
end
#
# Triggers the reply expression of the expression given by its id.
#
def reply (exp, workitem)
exp, fei = fetch(exp)
ldebug { "reply() to #{fei.to_debug_s}" }
- ldebug { "reply() from #{workitem.last_expression_id}" }
+ ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" }
if not exp
#raise "cannot reply to missing #{fei.to_debug_s}"
lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" }
return
@@ -447,21 +482,21 @@
end
#
# Adds or updates a flow expression in this pool
#
- def update (flowExpression)
+ def update (flow_expression)
+ #ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
+
t = Timer.new
- onotify :update, flowExpression.fei, flowExpression
+ onotify :update, flow_expression.fei, flow_expression
- #get_expression_storage()[flowExpression.fei] = flowExpression
-
ldebug { "update() took #{t.duration} ms" }
- return flowExpression
+ flow_expression
end
#
# Fetches a FlowExpression from the pool.
# Returns a tuple : the FlowExpression plus its FlowExpressionId.
@@ -501,14 +536,19 @@
return exp
end
#
# Fetches the root expression of a process (given any of its
- # expressions).
+ # expressions or its wfid).
#
- def fetch_root (exp)
- exp = fetch_expression(exp)
+ def fetch_root (exp_or_wfid)
+
+ return fetch_expression_with_wfid(exp_or_wfid) \
+ if exp_or_wfid.is_a? String
+
+ exp = fetch_expression(exp_or_wfid)
+
return exp unless exp.parent_id
return fetch_root(fetch_expression(exp.parent_id))
end
#
@@ -631,31 +671,59 @@
ldebug do
"get_flow_position() " +
"found #{result.size} exps for flow #{wfid}"
end
- return result
+ result
end
+ #
+ # Lists all workflows (processes) currently in the expool (in
+ # the engine).
+ # This method will return a list of "process-definition" expressions
+ # (root of flows).
+ #
+ # If consider_subprocesses is set to true, "process-definition"
+ # expressions of subprocesses will be returned as well.
+ #
+ # "wfid_prefix" allows your to query for specific workflow instance
+ # id prefixes.
+ #
+ def list_workflows (consider_subprocesses=false, wfid_prefix=nil)
+
+ result = []
+
+ get_expression_storage.real_each do |fei, fexp|
+
+ next unless fexp.is_a? DefineExpression
+
+ next if not consider_subprocesses and fei.wfid.index(".")
+
+ next unless fei.wfid.match("^#{wfid_prefix}") if wfid_prefix
+
+ result << fexp
+ end
+
+ result
+ end
+ alias :list_processes :list_workflows
+
+ #
+ # Returns the first expression found with the given wfid.
+ #
+ def fetch_expression_with_wfid (wfid)
+
+ list_workflows(false, wfid)[0]
+ end
+
protected
#def evaluate_definition (raw_definition, workitem)
# expression = raw_definition.instantiate(workitem)
#end
#
- # Prepares the RawExpression for a 'remote' process definition
- # (a process definition pointed at via a URI).
- #
- def load_rawexp_from_uri (uri)
-
- procdef = determine_representation(uri.read)
-
- build_raw_expression(uri.to_s, procdef)
- end
-
- #
# Removes an environment, especially takes care of unbinding
# any special value it may contain.
#
def remove_environment (environment_id)
@@ -682,80 +750,81 @@
return wi
end
#
- # Given a string that contains either an XML process
- # definition, either a Ruby process definition
- # will return a 'representation' (what is used to build
- # a RawExpression instance).
+ # This is the only point in the expression pool where an URI
+ # is read, so this is where the :remote_definitions_allowed
+ # security check is enforced.
#
- def determine_representation (string_definition)
+ def read_uri (uri)
- if string_definition[0, 1] == "<"
+ uri = uri.to_s
+ uri = uri[5..-1] if uri.match("^file:")
+ uri = URI.parse(uri)
- xmlRoot = REXML::Document.new(string_definition).root
- class << xmlRoot
- def raw_expression_class
- XmlRawExpression
- end
- end
- return xmlRoot
+ if uri.scheme
+ raise "loading remote definitions is not allowed" \
+ if ac[:remote_definitions_allowed] != true
end
- # else
-
- #o = eval(string_definition)
- o = OpenWFE::eval_safely(string_definition, SAFETY_LEVEL)
- #o = OpenWFE::load_eval_safely(string_definition, SAFETY_LEVEL)
- #o = OpenWFE::eval_r_safely(string_definition, SAFETY_LEVEL)
-
- return o.do_make if o.kind_of? Class
-
- return o
+ open(uri.to_s).read
end
#
- # Extracts the process definition that a launchitem points at.
+ # The parameter to this method might be either a process
+ # definition (in any form) or a LaunchItem.
#
- def fetch_definition (launchitem)
+ # Will return a 'representation' (what is used to build
+ # a RawExpression instance).
+ #
+ def determine_representation (param)
- wfdUrl = launchitem.workflow_definition_url
+ #ldebug do
+ # "determine_representation() from class #{param.class.name}"
+ #end
- #ldebug { "wfdUrl is '#{wfdUrl}'" }
+ param = read_uri(param) if param.is_a? URI
- sDefinition = nil
- wfdField = nil
+ #ldebug do
+ # "determine_representation() " +
+ # "param of class #{param.class.name}"
+ #end
- if wfdUrl[0..5] == 'field:'
- wfdField = wfdUrl[6..-1]
- sDefinition = launchitem.attributes[wfdField]
- else
- sDefinition = URI.parse(wfdUrl).read
- end
+ return param if param.is_a? ProgExpRepresentation
+ return param.make if param.is_a? ProcessDefinition
+ return param.do_make if param.is_a? Class
- #ldebug { "sDefinition is \n#{sDefinition}" }
+ raise "cannot handle definition of class #{param.class.name}" \
+ unless param.is_a? String
- launchitem.attributes.delete(wfdField) if wfdField
+ if param[0, 1] == "<"
+ #
+ # XML definition
- return determine_representation(sDefinition) \
- if sDefinition.kind_of? String
+ xmlRoot = REXML::Document.new(param).root
+ class << xmlRoot
+ def raw_expression_class
+ XmlRawExpression
+ end
+ end
+ return xmlRoot
+ end
- return sDefinition \
- if sDefinition.kind_of? ProgExpRepresentation
+ return YAML.load(s) if param.match("^--- .")
+ #
+ # something that was dumped via YAML
- return sDefinition.make \
- if sDefinition.kind_of? ProcessDefinition
+ #
+ # else it's some ruby code to eval
- return sDefinition.do_make \
- if sDefinition.kind_of? Class
+ o = OpenWFE::eval_safely(param, SAFETY_LEVEL)
- # else...
+ return o.make if o.is_a? ProcessDefinition
+ return o.do_make if o.kind_of? Class
- raise \
- "Cannot deduce process definition " +
- "out of instance of class #{sDefinition.class}"
+ o
end
#
# Builds a FlowExpressionId instance for process being
# launched.
@@ -763,43 +832,32 @@
def new_fei (flow_url, flow_name, flow_revision, exp_name)
fei = FlowExpressionId.new
fei.owfe_version = OPENWFERU_VERSION
- fei.engine_id = get_engine.service_name
- fei.initial_engine_id = fei.engine_id
- fei.workflow_definition_url = flow_url
- fei.workflow_definition_name = flow_name
- fei.workflow_definition_revision = flow_revision
- fei.workflow_instance_id = new_workflow_instance_id()
+ fei.engine_id = OpenWFE::stu get_engine.service_name
+ fei.initial_engine_id = OpenWFE::stu fei.engine_id
+ fei.workflow_definition_url = OpenWFE::stu flow_url
+ fei.workflow_definition_name = OpenWFE::stu flow_name
+ fei.workflow_definition_revision = OpenWFE::stu flow_revision
+ fei.workflow_instance_id = get_wfid_generator.generate
fei.expression_id = "0"
fei.expression_name = exp_name
return fei
end
#
- # Returns a new unique workflow instance id
- #
- def new_workflow_instance_id ()
-
- synchronize do
-
- wfid = OpenWFE::current_time_millis()
-
- wfid = @@last_given_instance_id + 1 \
- if wfid <= @@last_given_instance_id
-
- @@last_given_instance_id = wfid
-
- return wfid.to_s
- end
- end
-
- #
# Builds the RawExpression instance at the root of the flow
# being launched.
#
- def build_raw_expression (url, procdef)
+ # The param can be a LaunchItem, an URI, anything accepted
+ # by the determine_representation() method.
+ #
+ def build_raw_expression (url, param)
+
+ procdef = determine_representation(param)
+
+ #return procdef if procdef.is_a? RawExpression
flow_name = procdef.attributes['name']
flow_revision = procdef.attributes['revision']
exp_name = procdef.name