lib/openwfe/expool/expressionpool.rb in ruote-0.9.19 vs lib/openwfe/expool/expressionpool.rb in ruote-0.9.20
- old
+ new
@@ -1,63 +1,44 @@
-#
#--
-# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org
-# All rights reserved.
+# Copyright (c) 2006-2009, John Mettraux, jmettraux@gmail.com
#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
#
-# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
#
-# . Neither the name of the "OpenWFE" nor the names of its contributors may be
-# used to endorse or promote products derived from this software without
-# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
+# Made in Japan.
#++
-#
-#
-# "made in Japan"
-#
-# John Mettraux at openwfe.org
-#
-require 'uri'
-
require 'openwfe/utils'
require 'openwfe/service'
require 'openwfe/logging'
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
require 'openwfe/util/observable'
-require 'openwfe/expool/parser'
-require 'openwfe/expool/representation'
-require 'openwfe/expool/paused_error'
+require 'openwfe/expool/errors'
require 'openwfe/expool/expool_pause_methods'
+require 'openwfe/expool/representation'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/raw'
-require 'rufus/verbs' # gem 'rufus-lru'
-
module OpenWFE
#
# The ExpressionPool stores expressions (pieces of workflow instance).
# It's the core of the workflow engine.
@@ -88,12 +69,10 @@
service_init service_name, application_context
@paused_instances = {}
- #@monitors = MonitorProvider.new(application_context)
-
@observers = {}
@stopped = false
engine_environment_id
@@ -105,147 +84,51 @@
#
def stop
@stopped = true
- onotify :stop
+ onotify(:stop)
end
- #--
- # Obtains a unique monitor for an expression.
- # It avoids the need for the FlowExpression instances to include
- # the monitor mixin by themselves
#
- #def get_monitor (fei)
- # @monitors[fei]
- #end
- #++
-
- #
- # This method is called by the launch method. It's actually the first
- # stage of that method.
- # It may be interessant to use to 'validate' a launchitem and its
- # process definition, as it will raise an exception in case
- # of 'parameter' mismatch.
- #
- # There is a 'pre_launch_check' alias for this method in the
- # Engine class.
- #
- def prepare_raw_expression (launchitem)
-
- wfdurl = launchitem.workflow_definition_url
-
- raise "launchitem.workflow_definition_url not set, cannot launch" \
- unless wfdurl
-
- definition = if wfdurl.match "^field:"
-
- raise(
- ":definition_in_launchitem_allowed not set to true, "+
- "cannot launch"
- ) if ac[:definition_in_launchitem_allowed] != true
-
- wfdfield = wfdurl[6..-1]
- launchitem.attributes.delete wfdfield
- else
-
- read_uri wfdurl
- end
-
- raise "didn't find process definition at '#{wfdurl}'" \
- unless definition
-
- raw_expression = build_raw_expression launchitem, definition
-
- raw_expression.check_parameters launchitem
- #
- # will raise an exception if there are requirements
- # and one of them is not met
-
- raw_expression
- end
-
- #
- # Instantiates a workflow definition and launches it.
- #
- # This method call will return immediately, it could even return
- # before the actual launch is completely over.
- #
- # Returns the FlowExpressionId instance of the root expression of
- # the newly launched flow.
- #
- def launch (launchitem, options={})
-
- wait = (options[:wait_for] == true)
-
- #
- # prepare raw expression
-
- raw_expression = prepare_raw_expression launchitem
- #
- # will raise an exception if there are requirements
- # and one of them is not met
-
- raw_expression = wrap_in_schedule(raw_expression, options) \
- if options.size > 0
-
- raw_expression.new_environment
- #
- # as this expression is the root of a new process instance,
- # it has to have an environment for all the variables of
- # the process instance
-
- fei = raw_expression.fei
-
- #
- # apply prepared raw expression
-
- wi = build_workitem launchitem
-
- onotify :launch, fei, launchitem
-
- if wait
- wait_for(fei) { apply raw_expression, wi }
- else
- apply raw_expression, wi
- fei
- end
- end
-
- #
# This is the first stage of the tlaunch_child() method.
#
# (it's used by the concurrent iterator when preparing all its
# iteration children)
#
- def tprepare_child (
- parent_exp, template, sub_id, register_child, vars)
+ def tprepare_child (parent_exp, template, sub_id, options={})
- return fetch_expression(template) \
- if template.is_a?(FlowExpressionId)
+ return fetch_expression(template) if template.is_a?(FlowExpressionId)
+ # used for "scheduled launches"
fei = parent_exp.fei.dup
- fei.expression_name = template.first
fei.expression_id = "#{fei.expid}.#{sub_id}"
+ fei.expression_name = template.first
+ parent_id = options[:orphan] ? nil : parent_exp.fei
+
raw_exp = RawExpression.new_raw(
- fei, nil, nil, @application_context, template)
+ fei, parent_id, nil, @application_context, template)
- raw_exp.parent_id = parent_exp.fei
-
- if vars
- raw_exp.new_environment vars
+ if vars = options[:variables]
+ raw_exp.new_environment(vars)
else
raw_exp.environment_id = parent_exp.environment_id
end
+ raw_exp.dup_environment if options[:dup_environment]
+
#workitem.fei = raw_exp.fei
# done in do_apply...
- if register_child
+ if options[:register_child] == true
+
(parent_exp.children ||= []) << raw_exp.fei
- update raw_exp
+
+ update(raw_exp)
+
+ parent_exp.store_itself unless options[:dont_store_parent]
end
raw_exp
end
@@ -256,124 +139,82 @@
# If the last, register_child, is set to true, this method will
# take care of adding the new child to the parent expression.
#
# (used by 'cron' and more)
#
- def tlaunch_child (
- parent_exp, template, sub_id, workitem, register_child, vars=nil)
+ def tlaunch_child (parent_exp, template, sub_id, workitem, opts={})
- raw_exp = tprepare_child(
- parent_exp, template, sub_id, register_child, vars)
+ raw_exp = tprepare_child(parent_exp, template, sub_id, opts)
- onotify :tlaunch_child, raw_exp.fei, workitem
+ onotify(:tlaunch_child, raw_exp.fei, workitem)
- apply raw_exp, workitem
+ apply(raw_exp, workitem)
raw_exp.fei
end
#
- # Launches a template, but makes sure the new expression has no
- # parent.
- #
- # (used by 'listen')
- #
- def tlaunch_orphan (
- firing_exp, template, sub_id, workitem, register_child)
-
- fei = firing_exp.fei.dup
- fei.expression_id = "#{fei.expid}.#{sub_id}"
- fei.expression_name = template.first
-
- raw_exp = RawExpression.new_raw(
- fei, nil, nil, @application_context, template)
-
- #raw_exp.parent_id = GONE_PARENT_ID
- raw_exp.parent_id = nil
- # it's an orphan, no parent
-
- raw_exp.environment_id = firing_exp.environment_id
- # tapping anyway into the firer's environment
-
- (firing_exp.children ||= []) << raw_exp.fei \
- if register_child
-
- onotify :tlaunch_orphan, raw_exp.fei, workitem
-
- apply raw_exp, workitem
-
- raw_exp.fei
- end
-
- #
# Launches a subprocess.
# The resulting wfid is a subid for the wfid of the firing expression.
#
- # (used by 'subprocess')
+ # (used by the 'subprocess' expression, the 'on_cancel' feature and the
+ # ProcessParticipant)
#
def launch_subprocess (
- firing_exp, template, forget, workitem, params)
+ firing_exp, template, forget, workitem, initial_variables)
- raw_exp = if template.is_a?(FlowExpressionId)
+ raw_exp = build_raw_expression(template)
- fetch_expression template
+ raw_exp.parent_id = forget ? nil : firing_exp.fei
- elsif template.is_a?(RawExpression)
+ raw_exp.fei.workflow_definition_url = firing_exp.fei.wfurl
- template.application_context = @application_context
- template
-
- else # probably an URI
-
- build_raw_expression nil, template
- end
-
- raw_exp = raw_exp.dup
- raw_exp.fei = raw_exp.fei.dup
-
- if forget
- raw_exp.parent_id = nil
- else
- raw_exp.parent_id = firing_exp.fei
- end
-
- #raw_exp.fei.wfid = get_wfid_generator.generate
- #raw_exp.fei.wfid =
- # "#{firing_exp.fei.wfid}.#{firing_exp.get_next_sub_id}"
raw_exp.fei.wfid =
"#{firing_exp.fei.parent_wfid}.#{firing_exp.get_next_sub_id}"
- raw_exp.new_environment params
+ raw_exp.new_environment(initial_variables)
raw_exp.store_itself
- apply raw_exp, workitem
+ apply(raw_exp, workitem)
raw_exp.fei
end
#
# Replaces the flow expression with a raw expression that has
# the same fei, same parent and points to the same env.
# The raw_representation will be the template.
# Stores and then apply the "cuckoo" expression.
#
+ # Used by 'exp' and 'eval' and the do_handle_error method of the expool.
+ #
def substitute_and_apply (fexp, template, workitem)
re = RawExpression.new_raw(
fexp.fei,
fexp.parent_id,
fexp.environment_id,
application_context,
template)
- update re
+ update(re)
- apply re, workitem
+ apply(re, workitem)
end
#
+ # Launches new process instance.
+ #
+ def launch (raw_exp, workitem)
+
+ onotify(:launch, raw_exp.fei, workitem)
+
+ apply(raw_exp, workitem)
+ end
+
+ #
# Applies a given expression (id or expression)
#
def apply (exp_or_fei, workitem)
get_workqueue.push(
@@ -394,24 +235,25 @@
# The param might be an expression instance or a FlowExpressionId
# instance.
#
def cancel (exp)
- exp, fei = fetch exp
+ exp, fei = fetch(exp)
unless exp
linfo { "cancel() cannot cancel missing #{fei.to_debug_s}" }
return nil
end
ldebug { "cancel() for #{fei.to_debug_s}" }
- onotify :cancel, exp
+ onotify(:cancel, exp)
wi = exp.cancel
- remove exp
+ remove(exp)
+ # will remove owned environment if any
wi
end
#
@@ -422,43 +264,45 @@
# care of removing the cancelled expression from the parent
# expression.
#
def cancel_expression (exp)
- exp = fetch_expression exp
+ exp, fei = fetch(exp)
- wi = cancel exp
+ raise "cannot cancel 'missing' expression #{fei.to_short_s}" unless exp
- # ( remember that in case of error, no wi could get returned...)
+ wi = cancel(exp)
+ # (remember that in case of error, no wi can get returned...)
+
if wi
- reply_to_parent exp, wi, false
+ reply_to_parent(exp, wi, false)
elsif exp.parent_id
- parent_exp = fetch_expression exp.parent_id
+ parent_exp = fetch_expression(exp.parent_id)
parent_exp.remove_child(exp.fei) if parent_exp
end
end
#
# Given any expression of a process, cancels the complete process
# instance.
#
def cancel_process (exp_or_wfid)
- wfid = extract_wfid exp_or_wfid, false
+ wfid = extract_wfid(exp_or_wfid, false)
# 'true' would have made sure that the parent wfid is used...
ldebug { "cancel_process() '#{wfid}'" }
- root = fetch_root wfid
+ root = fetch_root(wfid)
raise "no process to cancel '#{wfid}'" unless root
- cancel root
+ cancel(root)
end
alias :cancel_flow :cancel_process
#
# Forgets the given expression (make it an orphan).
@@ -469,37 +313,33 @@
#ldebug { "forget() forgetting #{fei}" }
return if not exp
- onotify :forget, exp
-
parent_exp.children.delete(fei)
- #exp.parent_id = GONE_PARENT_ID
exp.parent_id = nil
-
exp.dup_environment
- exp.store_itself()
+ exp.store_itself
- ldebug { "forget() forgot #{fei}" }
+ onotify(:forget, exp)
+
+ ldebug { "forget() forgot #{fei}" }
end
#
# Replies to the parent of the given expression.
#
def reply_to_parent (exp, workitem, remove=true)
- ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
-
workitem.last_expression_id = exp.fei
- onotify :reply_to_parent, exp, workitem
+ onotify(:reply_to_parent, exp, workitem)
if remove
- remove exp
+ remove(exp)
#
# remove the expression itself
exp.clean_children
#
@@ -507,73 +347,66 @@
end
#
# manage tag, have to remove it so it can get 'redone' or 'undone'
# (preventing abuse)
+ #
+ # do the same for the on_error handler if any
- tagname = exp.attributes["tag"] if exp.attributes
-
+ tagname = exp.attributes['tag']
exp.delete_variable(tagname) if tagname
+ #exp.delete_variable(tagname) if tagname and not tagname.match(/^\//)
+ on_error = exp.attributes['on_error'] #if exp.attributes
+ exp.delete_variable(on_error) if on_error
+
#
# has raw_expression been updated ?
- track_child_raw_representation exp
+ track_child_raw_representation(exp)
#
# flow terminated ?
if (not exp.parent_id) and (exp.fei.expid == '0')
- ldebug do
- "reply_to_parent() process " +
- "#{exp.fei.workflow_instance_id} terminated"
- end
+ ldebug { "reply_to_parent() process #{exp.fei.wfid} terminated" }
- onotify :terminate, exp, workitem
+ onotify(:terminate, exp, workitem)
return
end
#
# else, gone parent ?
- if (not exp.parent_id) or (exp.parent_id.expname == 'gone')
- # this 'gone' is kept for some level of 'backward compatibility'
+ #if (not exp.parent_id) or (exp.parent_id.expname == 'gone')
+ # # this 'gone' is kept for some level of 'backward compatibility'
- ldebug do
- "reply_to_parent() parent is gone for " +
- exp.fei.to_debug_s
- end
+ if (not exp.parent_id)
+ ldebug { "reply_to_parent() parent is gone for #{exp.fei.to_debug_s}"}
return
end
#
# parent still present, reply to it
- reply exp.parent_id, workitem
+ reply(exp.parent_id, workitem)
end
#
# Adds or updates a flow expression in this pool
#
def update (flow_expression)
flow_expression.updated_at = Time.now
- ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
+ #ldebug { "update() for #{flow_expression.fei.to_debug_s}" }
- #t = Timer.new
+ onotify(:update, flow_expression.fei, flow_expression)
- onotify :update, flow_expression.fei, flow_expression
-
- #ldebug do
- # "update() took #{t.duration} ms " +
- # "#{flow_expression.fei.to_debug_s}"
- #end
-
flow_expression
end
#
# Fetches a FlowExpression from the pool.
@@ -581,98 +414,68 @@
#
# The param 'exp' may be a FlowExpressionId or a FlowExpression that
# has to be reloaded.
#
def fetch (exp)
- #synchronize do
- #ldebug { "fetch() exp is of kind #{exp.class}" }
+ fei = extract_fei(exp)
- fei = if exp.is_a?(FlowExpression)
-
- exp.fei
-
- elsif not exp.is_a?(FlowExpressionId)
-
- raise \
- "Cannot fetch expression with key : "+
- "'#{fei}' (#{fei.class})"
-
- else
-
- exp
- end
-
- #ldebug { "fetch() for #{fei.to_debug_s}" }
-
[ get_expression_storage[fei], fei ]
- #end
end
#
# Fetches a FlowExpression (returns only the FlowExpression instance)
#
# The param 'exp' may be a FlowExpressionId or a FlowExpression that
# has to be reloaded.
#
def fetch_expression (exp)
- exp, fei = fetch exp
- exp
+ fetch(exp)[0]
end
#
# Returns the engine environment (the top level environment)
#
def fetch_engine_environment
- #synchronize do
- #
- # synchronize to ensure that there's 1! engine env
eei = engine_environment_id
- ee, fei = fetch eei
+ ee, fei = fetch(eei)
return ee if ee
ee = Environment.new_env(
eei, nil, nil, @application_context, nil)
ee.store_itself
ee
- #end
end
#
# Fetches the root expression of a process (or a subprocess).
#
def fetch_root (wfid)
- get_expression_storage.fetch_root wfid
+ get_expression_storage.fetch_root(wfid)
end
#
# Removes a flow expression from the pool
# (This method is mainly called from the pool itself)
#
def remove (exp)
- exp, _fei = fetch(exp) \
- if exp.is_a?(FlowExpressionId)
+ exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId)
return unless exp
- ldebug { "remove() fe #{exp.fei.to_debug_s}" }
+ #ldebug { "remove() fe #{exp.fei.to_debug_s}" }
- onotify :remove, exp.fei
+ onotify(:remove, exp.fei)
- #synchronize do
- #@monitors.delete(exp.fei)
-
- remove_environment(exp.environment_id) \
- if exp.owns_its_environment?
- #end
+ remove_environment(exp.environment_id) if exp.owns_its_environment?
end
#
# This method is called at each expool (engine) [re]start.
# It roams through the previously saved (persisted) expressions
@@ -682,21 +485,21 @@
return if @stopped
t = OpenWFE::Timer.new
- linfo { "reschedule() initiating..." }
+ linfo { 'reschedule() initiating...' }
options = { :include_classes => Rufus::Schedulable }
get_expression_storage.find_expressions(options).each do |fexp|
linfo { "reschedule() for #{fexp.fei.to_s}..." }
- onotify :reschedule, fexp.fei
+ onotify(:reschedule, fexp.fei)
- fexp.reschedule get_scheduler
+ fexp.reschedule(get_scheduler)
end
linfo { "reschedule() done. (took #{t.duration} ms)" }
end
@@ -704,117 +507,83 @@
# Returns the unique engine_environment FlowExpressionId instance.
# There is only one such environment in an engine, hence this
# 'singleton' method.
#
def engine_environment_id
- #synchronize do
- # no need, it's been already called at initialization
- return @eei if @eei
-
- @eei = FlowExpressionId.new
- @eei.owfe_version = OPENWFERU_VERSION
- @eei.engine_id = get_engine.engine_name
- @eei.initial_engine_id = @eei.engine_id
- @eei.workflow_definition_url = 'ee'
- @eei.workflow_definition_name = 'ee'
- @eei.workflow_definition_revision = '0'
- @eei.workflow_instance_id = '0'
- @eei.expression_name = EN_ENVIRONMENT
- @eei.expression_id = '0'
- @eei
- #end
+ @eei ||= new_fei(
+ :workflow_definition_url => 'ee',
+ :workflow_definition_name => 'ee',
+ :workflow_instance_id => '0',
+ :expression_name => EN_ENVIRONMENT)
end
- #
+ #--
# Returns the list of applied expressions belonging to a given
# workflow instance.
#
# If the unapplied optional parameter is set to true, all the
# expressions (even those not yet applied) that compose the process
# instance will be returned. Environments will be returned as well.
#
- def process_stack (wfid, unapplied=false)
+ #def process_stack (wfid)
+ # wfid = extract_wfid(wfid, true)
+ # params = { :parent_wfid => wfid }
+ # stack = get_expression_storage.find_expressions(params)
+ # stack.extend(RepresentationMixin)
+ # stack
+ #end
+ #++
- #raise "please provide a non-nil workflow instance id" \
- # unless wfid
-
- wfid = extract_wfid wfid, true
-
- params = {
- #:exclude_classes => [ Environment, RawExpression ],
- #:exclude_classes => [ Environment ],
- :parent_wfid => wfid
- }
- params[:applied] = true if (not unapplied)
-
- stack = get_expression_storage.find_expressions params
-
- stack.extend(RepresentationMixin) if unapplied
-
- stack
- end
-
- #
+ #--
# Lists all workflows (processes) currently in the expool (in
# the engine).
# This method will return a list of "process-definition" expressions
# (root of flows).
#
- def list_processes (options={})
+ #def list_processes (options={})
+ # options[:include_classes] = DefineExpression
+ # #
+ # # Maybe it would be better to list root expressions instead
+ # # so that expressions like 'sequence' can be used
+ # # as root expressions. Later...
+ # get_expression_storage.find_expressions(options)
+ #end
+ #++
- options[:include_classes] = DefineExpression
- #
- # Maybe it would be better to list root expressions instead
- # so that expressions like 'sequence' can be used
- # as root expressions. Later...
-
- get_expression_storage.find_expressions options
- end
-
#
# This method is called when apply() or reply() failed for
# an expression.
# There are currently only two 'users', the ParticipantExpression
# class and the do_process_workelement method of this ExpressionPool
# class.
#
- def notify_error (error, fei, message, workitem)
+ # Error handling is done here, if no handler was found, the error simply
+ # generate a notification (generally caught by an error journal).
+ #
+ def handle_error (error, fei, message, workitem)
- fei = extract_fei fei
- # densha requires that... :(
+ fei = extract_fei(fei) # just to be sure
- se = OpenWFE::exception_to_s error
-
- #onotify :error, fei, message, workitem, error.class.name, se
- onotify(:error, fei, message, workitem, error.class.name, error.to_s)
-
if error.is_a?(PausedError)
lwarn do
"#{self.service_name} " +
"operation :#{message.to_s} on #{fei.to_s} " +
"delayed because process '#{fei.wfid}' is in pause"
end
else
lwarn do
"#{self.service_name} " +
"operation :#{message.to_s} on #{fei.to_s} " +
- "failed with\n" + se
+ "failed with\n" + OpenWFE::exception_to_s(error)
end
end
- end
- #
- # Gets the process definition (if necessary) and turns into
- # into an expression tree (for storing into a RawExpression).
- #
- def determine_rep (param)
+ # notify or really handle ?
- param = read_uri(param) if param.is_a?(URI)
-
- #DefParser.parse param
- get_def_parser.parse param
+ do_handle_error(fei, workitem) ||
+ onotify(:error, fei, message, workitem, error.class.name, error.to_s)
end
#
# Returns true if the process instance to which the expression
# belongs is currently paused.
@@ -822,316 +591,351 @@
def is_paused? (expression)
(@paused_instances[expression.fei.parent_wfid] != nil)
end
- protected
+ #
+ # Builds the RawExpression instance at the root of the flow
+ # being launched.
+ #
+ # The param can be a template or a definition (or a URI).
+ #
+ def build_raw_expression (param, launchitem=nil)
- #
- # If the launch option :wait_for is set to true, this method
- # will be called to apply the raw_expression. It will only return
- # when the launched process is over, which means it terminated, it
- # had an error or it got cancelled.
- #
- def wait_for (fei_or_wfid)
+ procdef = get_def_parser.determine_rep(param)
- wfid = extract_wfid fei_or_wfid, false
+ # procdef is a nested [ name, attributes, children ] structure now
- t = Thread.current
- result = nil
+ atts = procdef[1]
- to = get_expression_pool.add_observer(:terminate) do |c, fe, wi|
- if fe.fei.workflow_instance_id == wfid
- result = [ :terminate, wi, fei_or_wfid ]
- t.wakeup
- end
+ h = {
+ :workflow_instance_id =>
+ get_wfid_generator.generate(launchitem),
+ :workflow_definition_name =>
+ atts['name'] || procdef[2].first || 'no-name',
+ :workflow_definition_revision =>
+ atts['revision'] || '0',
+ :expression_name =>
+ procdef[0]
+ }
+
+ h[:workflow_definition_url] = (
+ launchitem.workflow_definition_url || LaunchItem::FIELD_DEF
+ ) if launchitem
+
+ RawExpression.new_raw(
+ new_fei(h), nil, nil, @application_context, procdef)
+ end
+
+ #
+ # If the launch option :wait_for is set to true, this method
+ # will be called to apply the raw_expression. It will only return
+ # when the launched process is over, which means it terminated, it
+ # had an error or it got cancelled.
+ #
+ def wait_for (fei_or_wfid)
+
+ wfid = extract_wfid(fei_or_wfid, false)
+
+ t = Thread.current
+ result = nil
+
+ to = add_observer(:terminate) do |c, fe, wi|
+ if fe.fei.wfid == wfid
+ result = [ :terminate, wi, fei_or_wfid ]
+ t.wakeup
end
- te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
- if fei.parent_wfid == wfid
- result = [ :error, e, fei_or_wfid ]
- t.wakeup
- end
+ end
+ te = add_observer(:error) do |c, fei, m, i, e|
+ if fei.parent_wfid == wfid
+ result = [ :error, e, fei_or_wfid ]
+ t.wakeup
end
- tc = get_expression_pool.add_observer(:cancel) do |c, fe|
- if fe.fei.wfid == wfid and fe.fei.expid == '0'
- result = [ :cancel, wfid, fei_or_wfid ]
- t.wakeup
- end
+ end
+ tc = add_observer(:cancel) do |c, fe|
+ if fe.fei.wfid == wfid and fe.fei.expid == '0'
+ result = [ :cancel, wfid, fei_or_wfid ]
+ t.wakeup
end
+ end
- #apply raw_expression, wi
- yield if block_given?
+ yield if block_given?
- Thread.stop unless result
+ Thread.stop unless result
- linfo { "wait_for() '#{wfid}' is over" }
+ linfo { "wait_for() '#{wfid}' is over" }
- get_expression_pool.remove_observer to, :terminate
- get_expression_pool.remove_observer te, :error
- get_expression_pool.remove_observer tc, :cancel
+ remove_observer(to, :terminate)
+ remove_observer(te, :error)
+ remove_observer(tc, :cancel)
- result
- end
+ result
+ end
- #
- # This is the only point in the expression pool where an URI
- # is read, so this is where the :remote_definitions_allowed
- # security check is enforced.
- #
- def read_uri (uri)
+ protected
- uri = URI.parse uri.to_s
+ #
+ # Checks if there is an event handler available
+ #
+ def do_handle_error (fei, workitem)
- raise ":remote_definitions_allowed is set to false" \
- if (ac[:remote_definitions_allowed] != true and
- uri.scheme and
- uri.scheme != 'file')
+ fexp = fetch_expression(fei)
- #open(uri.to_s).read
+ eh_stack = fexp.lookup_variable_stack('error_handlers')
- f = Rufus::Verbs.fopen uri
- result = f.read
- f.close if f.respond_to?(:close)
+ return false if eh_stack.empty?
- result
- end
+ eh_stack.each do |env, ehandlers|
+ ehandlers.reverse.each do |ehandler|
- #
- # This is the method called [asynchronously] by the WorkQueue
- # upon apply/reply.
- #
- def do_apply_reply (direction, exp_or_fei, workitem)
+ fei, on_error = ehandler
- fei = nil
+ next unless fexp.descendant_of?(fei)
- begin
+ return false if on_error == ''
+ #
+ # blanking the 'on_error' makes the block behave like if there
+ # were no error handler at all (error is then passed to error
+ # journal usually (if there is one listening))
- exp, fei = if exp_or_fei.is_a?(FlowExpressionId)
- fetch exp_or_fei
- else
- [ exp_or_fei, exp_or_fei.fei ]
- end
+ tryexp = fetch_expression(fei)
- #p [ direction, fei.wfid, fei.expid, fei.expname ]
- #
- # I uncomment that sometimes to see how the stack
- # grows (wfids and expids)
+ # remove error handler before consuming it
- ldebug {
- ":#{direction} "+
- "target #{fei.to_debug_s}" }
+ ehandlers.delete(ehandler)
+ env.store_itself
- if not exp
+ # fetch on_error template
- #raise "apply() cannot apply missing #{_fei.to_debug_s}"
- # not very helpful anyway
+ template = (on_error == 'redo') ?
+ tryexp.raw_representation :
+ tryexp.lookup_variable(on_error) || [ on_error, {}, [] ]
- lwarn { "do_apply_reply() cannot find >#{fei}" }
+ # cancel block that is adorned with 'on_error'
- return
- end
+ environment = tryexp.owns_its_environment? ?
+ tryexp.get_environment : nil
- check_if_paused exp
+ cancel(tryexp)
- workitem.fei = exp.fei if direction == :apply
+ ldebug { "do_handle_error() on_error : '#{on_error}'" }
- onotify direction, exp, workitem
+ if on_error == 'undo'
+ #
+ # block with 'undo' error handler simply gets undone in case of
+ # error
+ #
+ reply_to_parent(tryexp, workitem, false)
+ return true
+ end
- exp.send direction, workitem
+ # switch to error handling subprocess
- rescue Exception => e
+ environment.store_itself if environment
+ #
+ # the point of error had variables, make sure they are available
+ # to the error handling block.
- notify_error e, fei, direction, workitem
+ substitute_and_apply(tryexp, template, workitem)
+
+ return true
end
end
- #
- # Will raise an exception if the expression belongs to a paused
- # process.
- #
- def check_if_paused (expression)
+ false # no error handler found
+ end
- wfid = expression.fei.parent_wfid
+ #
+ # This is the method called [asynchronously] by the WorkQueue
+ # upon apply/reply.
+ #
+ def do_apply_reply (direction, exp_or_fei, workitem)
- raise PausedError.new(wfid) if @paused_instances[wfid]
- end
+ fei = nil
- #
- # if the launch method is called with a schedule option
- # (like :at, :in, :cron and :every), this method takes care of
- # wrapping the process with a sleep or a cron.
- #
- def wrap_in_schedule (raw_expression, options)
+ begin
- oat = options[:at]
- oin = options[:in]
- ocron = options[:cron]
- oevery = options[:every]
+ exp, fei = if exp_or_fei.is_a?(FlowExpressionId)
+ fetch(exp_or_fei)
+ else
+ [ exp_or_fei, exp_or_fei.fei ]
+ end
- fei = new_fei nil, "schedlaunch", "0", "sequence"
+ #p [ direction, fei.wfid, fei.expid, fei.expname ]
+ #
+ # I uncomment that sometimes to see how the stack
+ # grows (wfids and expids)
- # not very happy with this code, it builds custom
- # wrapping processes manually, maybe there is
- # a more elegant way, but for now, it's ok.
+ if not exp
- template = if oat or oin
+ #raise "apply() cannot apply missing #{_fei.to_debug_s}"
+ # not very helpful anyway
- sleep_atts = if oat
- { "until" => oat }
- else #oin
- { "for" => oin }
- end
- sleep_atts["scheduler-tags"] = "scheduled-launch, #{fei.wfid}"
+ lwarn { "do_apply_reply() :#{direction} but cannot find #{fei}" }
- raw_expression.new_environment
- raw_expression.store_itself
+ return
+ end
- [
- "sequence", {}, [
- [ "sleep", sleep_atts, [] ],
- raw_expression.fei
- ]
- ]
+ check_if_paused(exp)
- elsif ocron or oevery
+ workitem.fei = exp.fei if direction == :apply
- fei.expression_name = "cron"
+ onotify(direction, exp, workitem)
- cron_atts = if ocron
- { "tab" => ocron }
- else #oevery
- { "every" => oevery }
- end
- cron_atts["name"] = "//cron_launch__#{fei.wfid}"
- cron_atts["scheduler-tags"] = "scheduled-launch, #{fei.wfid}"
+ exp.send(direction, workitem)
- template = raw_expression.raw_representation
- remove raw_expression
+ rescue Exception => e
- [ "cron", cron_atts, [ template ] ]
+ handle_error(e, fei, direction, workitem)
+ end
+ end
- else
+ #
+ # Will raise an exception if the expression belongs to a paused
+ # process.
+ #
+ def check_if_paused (expression)
- nil # don't schedule at all
- end
+ wfid = expression.fei.parent_wfid
- if template
+ raise PausedError.new(wfid) if @paused_instances[wfid]
+ end
- raw_exp = RawExpression.new_raw(
- fei, nil, nil, @application_context, template)
+ #
+ # if the launch method is called with a schedule option
+ # (like :at, :in, :cron and :every), this method takes care of
+ # wrapping the process with a sleep or a cron.
+ #
+ def wrap_in_schedule (raw_expression, options)
- raw_exp.store_itself
+ oat = options[:at]
+ oin = options[:in]
+ ocron = options[:cron]
+ oevery = options[:every]
- raw_exp
- else
+ fei = new_fei(
+ :workflow_instance_id => get_wfid_generator.generate(nil),
+ :workflow_definition_name => 'schedlaunch',
+ :expression_name => 'sequence')
- raw_expression
+ # not very happy with this code, it builds custom
+ # wrapping processes manually, maybe there is
+ # a more elegant way, but for now, it's ok.
+
+ template = if oat or oin
+
+ sleep_atts = if oat
+ { 'until' => oat }
+ else #oin
+ { 'for' => oin }
end
- end
+ sleep_atts['scheduler-tags'] = "scheduled-launch, #{fei.wfid}"
- #
- # Removes an environment, especially takes care of unbinding
- # any special value it may contain.
- #
- def remove_environment (environment_id)
+ raw_expression.new_environment
+ raw_expression.store_itself
- ldebug { "remove_environment() #{environment_id.to_debug_s}" }
+ [
+ 'sequence', {}, [
+ [ 'sleep', sleep_atts, [] ],
+ raw_expression.fei
+ ]
+ ]
- env, fei = fetch(environment_id)
+ elsif ocron or oevery
- return unless env
- #
- # env already unbound and removed
+ fei.expression_name = 'cron'
- env.unbind
+ cron_atts = if ocron
+ { 'tab' => ocron }
+ else #oevery
+ { 'every' => oevery }
+ end
+ cron_atts['name'] = "//cron_launch__#{fei.wfid}"
+ cron_atts['scheduler-tags'] = "scheduled-launch, #{fei.wfid}"
- #get_expression_storage().delete(environment_id)
+ template = raw_expression.raw_representation
+ remove(raw_expression)
- onotify :remove, environment_id
+ [ 'cron', cron_atts, [ template ] ]
+
+ else
+
+ nil # don't schedule at all
end
- #
- # Prepares a new instance of InFlowWorkItem from a LaunchItem
- # instance.
- #
- def build_workitem (launchitem)
+ if template
- wi = InFlowWorkItem.new
+ raw_exp = RawExpression.new_raw(
+ fei, nil, nil, @application_context, template)
- wi.attributes = launchitem.attributes.dup
+ #raw_exp.store_itself
+ raw_exp.new_environment
- wi
+ raw_exp
+ else
+
+ raw_expression
end
+ end
- #
- # Builds a FlowExpressionId instance for a process being
- # launched.
- #
- def new_fei (launchitem, flow_name, flow_revision, exp_name)
+ #
+ # Removes an environment, especially takes care of unbinding
+ # any special value it may contain.
+ #
+ def remove_environment (environment_id)
- url = if launchitem
- launchitem.workflow_definition_url
- else
- "no-url"
- end
+ #ldebug { "remove_environment() #{environment_id.to_debug_s}" }
- fei = FlowExpressionId.new
+ env, fei = fetch(environment_id)
- fei.owfe_version = OPENWFERU_VERSION
- fei.engine_id = OpenWFE::stu get_engine.service_name.to_s
- fei.initial_engine_id = OpenWFE::stu fei.engine_id
- fei.workflow_definition_url = OpenWFE::stu url
- fei.workflow_definition_name = OpenWFE::stu flow_name
- fei.workflow_definition_revision = OpenWFE::stu flow_revision
- fei.wfid = get_wfid_generator.generate launchitem
- fei.expression_id = "0"
- fei.expression_name = exp_name
+ return unless env
+ #
+ # env already unbound and removed
- fei
- end
+ env.unbind
- #
- # Builds the RawExpression instance at the root of the flow
- # being launched.
- #
- # The param can be a template or a definition (anything
- # accepted by the determine_representation() method).
- #
- def build_raw_expression (launchitem, param)
+ onotify(:remove, environment_id)
+ end
- procdef = determine_rep param
+ #
+ # Builds a FlowExpressionId instance for a process being
+ # launched.
+ #
+ def new_fei (h)
- atts = procdef[1]
- flow_name = atts['name'] || "noname"
- flow_revision = atts['revision'] || "0"
- exp_name = procdef.first
+ h[:engine_id] = OpenWFE::stu(get_engine.engine_name)
- fei = new_fei launchitem, flow_name, flow_revision, exp_name
+ %w{ url name revision }.each { |k| stu(h, k) }
- RawExpression.new_raw(
- fei, nil, nil, @application_context, procdef)
- end
+ FlowExpressionId.new_fei(h)
+ end
- #
- # Given a [replying] child flow expression, will update its parent
- # raw expression if the child raw_expression changed.
- #
- # This is used to keep track of in-flight modification to running
- # process instances.
- #
- def track_child_raw_representation (fexp)
+ def stu (h, key)
- return unless fexp.raw_rep_updated == true
+ key = "workflow_definition_#{key}".intern
+ v = h[key]
+ h[key] = OpenWFE::stu(v.to_s) if v
+ end
- parent = fetch_expression fexp.parent_id
+ #
+ # Given a [replying] child flow expression, will update its parent
+ # raw expression if the child raw_expression changed.
+ #
+ # This is used to keep track of in-flight modification to running
+ # process instances.
+ #
+ def track_child_raw_representation (fexp)
- return if parent.class.uses_template?
+ return unless fexp.raw_rep_updated == true
- parent.raw_children[fexp.fei.child_id.to_i] =
- fexp.raw_representation
+ parent = fetch_expression(fexp.parent_id)
- parent.store_itself
- end
+ #p [ :storing, fexp.raw_representation, fexp.fei.to_short_s ]
+
+ parent.raw_children[fexp.fei.child_id.to_i] = fexp.raw_representation
+
+ parent.store_itself
+ end
end
end