lib/openwfe/expool/expressionpool.rb in ruote-0.9.19 vs lib/openwfe/expool/expressionpool.rb in ruote-0.9.20

- old
+ new

@@ -1,63 +1,44 @@ -# #-- -# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org -# All rights reserved. +# Copyright (c) 2006-2009, John Mettraux, jmettraux@gmail.com # -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: # -# . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. # -# . Neither the name of the "OpenWFE" nor the names of its contributors may be -# used to endorse or promote products derived from this software without -# specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. +# Made in Japan. #++ -# -# -# "made in Japan" -# -# John Mettraux at openwfe.org -# -require 'uri' - require 'openwfe/utils' require 'openwfe/service' require 'openwfe/logging' require 'openwfe/omixins' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/observable' -require 'openwfe/expool/parser' -require 'openwfe/expool/representation' -require 'openwfe/expool/paused_error' +require 'openwfe/expool/errors' require 'openwfe/expool/expool_pause_methods' +require 'openwfe/expool/representation' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw' -require 'rufus/verbs' # gem 'rufus-lru' - module OpenWFE # # The ExpressionPool stores expressions (pieces of workflow instance). # It's the core of the workflow engine. @@ -88,12 +69,10 @@ service_init service_name, application_context @paused_instances = {} - #@monitors = MonitorProvider.new(application_context) - @observers = {} @stopped = false engine_environment_id @@ -105,147 +84,51 @@ # def stop @stopped = true - onotify :stop + onotify(:stop) end - #-- - # Obtains a unique monitor for an expression. - # It avoids the need for the FlowExpression instances to include - # the monitor mixin by themselves # - #def get_monitor (fei) - # @monitors[fei] - #end - #++ - - # - # This method is called by the launch method. It's actually the first - # stage of that method. - # It may be interessant to use to 'validate' a launchitem and its - # process definition, as it will raise an exception in case - # of 'parameter' mismatch. - # - # There is a 'pre_launch_check' alias for this method in the - # Engine class. - # - def prepare_raw_expression (launchitem) - - wfdurl = launchitem.workflow_definition_url - - raise "launchitem.workflow_definition_url not set, cannot launch" \ - unless wfdurl - - definition = if wfdurl.match "^field:" - - raise( - ":definition_in_launchitem_allowed not set to true, "+ - "cannot launch" - ) if ac[:definition_in_launchitem_allowed] != true - - wfdfield = wfdurl[6..-1] - launchitem.attributes.delete wfdfield - else - - read_uri wfdurl - end - - raise "didn't find process definition at '#{wfdurl}'" \ - unless definition - - raw_expression = build_raw_expression launchitem, definition - - raw_expression.check_parameters launchitem - # - # will raise an exception if there are requirements - # and one of them is not met - - raw_expression - end - - # - # Instantiates a workflow definition and launches it. - # - # This method call will return immediately, it could even return - # before the actual launch is completely over. - # - # Returns the FlowExpressionId instance of the root expression of - # the newly launched flow. - # - def launch (launchitem, options={}) - - wait = (options[:wait_for] == true) - - # - # prepare raw expression - - raw_expression = prepare_raw_expression launchitem - # - # will raise an exception if there are requirements - # and one of them is not met - - raw_expression = wrap_in_schedule(raw_expression, options) \ - if options.size > 0 - - raw_expression.new_environment - # - # as this expression is the root of a new process instance, - # it has to have an environment for all the variables of - # the process instance - - fei = raw_expression.fei - - # - # apply prepared raw expression - - wi = build_workitem launchitem - - onotify :launch, fei, launchitem - - if wait - wait_for(fei) { apply raw_expression, wi } - else - apply raw_expression, wi - fei - end - end - - # # This is the first stage of the tlaunch_child() method. # # (it's used by the concurrent iterator when preparing all its # iteration children) # - def tprepare_child ( - parent_exp, template, sub_id, register_child, vars) + def tprepare_child (parent_exp, template, sub_id, options={}) - return fetch_expression(template) \ - if template.is_a?(FlowExpressionId) + return fetch_expression(template) if template.is_a?(FlowExpressionId) + # used for "scheduled launches" fei = parent_exp.fei.dup - fei.expression_name = template.first fei.expression_id = "#{fei.expid}.#{sub_id}" + fei.expression_name = template.first + parent_id = options[:orphan] ? nil : parent_exp.fei + raw_exp = RawExpression.new_raw( - fei, nil, nil, @application_context, template) + fei, parent_id, nil, @application_context, template) - raw_exp.parent_id = parent_exp.fei - - if vars - raw_exp.new_environment vars + if vars = options[:variables] + raw_exp.new_environment(vars) else raw_exp.environment_id = parent_exp.environment_id end + raw_exp.dup_environment if options[:dup_environment] + #workitem.fei = raw_exp.fei # done in do_apply... - if register_child + if options[:register_child] == true + (parent_exp.children ||= []) << raw_exp.fei - update raw_exp + + update(raw_exp) + + parent_exp.store_itself unless options[:dont_store_parent] end raw_exp end @@ -256,124 +139,82 @@ # If the last, register_child, is set to true, this method will # take care of adding the new child to the parent expression. # # (used by 'cron' and more) # - def tlaunch_child ( - parent_exp, template, sub_id, workitem, register_child, vars=nil) + def tlaunch_child (parent_exp, template, sub_id, workitem, opts={}) - raw_exp = tprepare_child( - parent_exp, template, sub_id, register_child, vars) + raw_exp = tprepare_child(parent_exp, template, sub_id, opts) - onotify :tlaunch_child, raw_exp.fei, workitem + onotify(:tlaunch_child, raw_exp.fei, workitem) - apply raw_exp, workitem + apply(raw_exp, workitem) raw_exp.fei end # - # Launches a template, but makes sure the new expression has no - # parent. - # - # (used by 'listen') - # - def tlaunch_orphan ( - firing_exp, template, sub_id, workitem, register_child) - - fei = firing_exp.fei.dup - fei.expression_id = "#{fei.expid}.#{sub_id}" - fei.expression_name = template.first - - raw_exp = RawExpression.new_raw( - fei, nil, nil, @application_context, template) - - #raw_exp.parent_id = GONE_PARENT_ID - raw_exp.parent_id = nil - # it's an orphan, no parent - - raw_exp.environment_id = firing_exp.environment_id - # tapping anyway into the firer's environment - - (firing_exp.children ||= []) << raw_exp.fei \ - if register_child - - onotify :tlaunch_orphan, raw_exp.fei, workitem - - apply raw_exp, workitem - - raw_exp.fei - end - - # # Launches a subprocess. # The resulting wfid is a subid for the wfid of the firing expression. # - # (used by 'subprocess') + # (used by the 'subprocess' expression, the 'on_cancel' feature and the + # ProcessParticipant) # def launch_subprocess ( - firing_exp, template, forget, workitem, params) + firing_exp, template, forget, workitem, initial_variables) - raw_exp = if template.is_a?(FlowExpressionId) + raw_exp = build_raw_expression(template) - fetch_expression template + raw_exp.parent_id = forget ? nil : firing_exp.fei - elsif template.is_a?(RawExpression) + raw_exp.fei.workflow_definition_url = firing_exp.fei.wfurl - template.application_context = @application_context - template - - else # probably an URI - - build_raw_expression nil, template - end - - raw_exp = raw_exp.dup - raw_exp.fei = raw_exp.fei.dup - - if forget - raw_exp.parent_id = nil - else - raw_exp.parent_id = firing_exp.fei - end - - #raw_exp.fei.wfid = get_wfid_generator.generate - #raw_exp.fei.wfid = - # "#{firing_exp.fei.wfid}.#{firing_exp.get_next_sub_id}" raw_exp.fei.wfid = "#{firing_exp.fei.parent_wfid}.#{firing_exp.get_next_sub_id}" - raw_exp.new_environment params + raw_exp.new_environment(initial_variables) raw_exp.store_itself - apply raw_exp, workitem + apply(raw_exp, workitem) raw_exp.fei end # # Replaces the flow expression with a raw expression that has # the same fei, same parent and points to the same env. # The raw_representation will be the template. # Stores and then apply the "cuckoo" expression. # + # Used by 'exp' and 'eval' and the do_handle_error method of the expool. + # def substitute_and_apply (fexp, template, workitem) re = RawExpression.new_raw( fexp.fei, fexp.parent_id, fexp.environment_id, application_context, template) - update re + update(re) - apply re, workitem + apply(re, workitem) end # + # Launches new process instance. + # + def launch (raw_exp, workitem) + + onotify(:launch, raw_exp.fei, workitem) + + apply(raw_exp, workitem) + end + + # # Applies a given expression (id or expression) # def apply (exp_or_fei, workitem) get_workqueue.push( @@ -394,24 +235,25 @@ # The param might be an expression instance or a FlowExpressionId # instance. # def cancel (exp) - exp, fei = fetch exp + exp, fei = fetch(exp) unless exp linfo { "cancel() cannot cancel missing #{fei.to_debug_s}" } return nil end ldebug { "cancel() for #{fei.to_debug_s}" } - onotify :cancel, exp + onotify(:cancel, exp) wi = exp.cancel - remove exp + remove(exp) + # will remove owned environment if any wi end # @@ -422,43 +264,45 @@ # care of removing the cancelled expression from the parent # expression. # def cancel_expression (exp) - exp = fetch_expression exp + exp, fei = fetch(exp) - wi = cancel exp + raise "cannot cancel 'missing' expression #{fei.to_short_s}" unless exp - # ( remember that in case of error, no wi could get returned...) + wi = cancel(exp) + # (remember that in case of error, no wi can get returned...) + if wi - reply_to_parent exp, wi, false + reply_to_parent(exp, wi, false) elsif exp.parent_id - parent_exp = fetch_expression exp.parent_id + parent_exp = fetch_expression(exp.parent_id) parent_exp.remove_child(exp.fei) if parent_exp end end # # Given any expression of a process, cancels the complete process # instance. # def cancel_process (exp_or_wfid) - wfid = extract_wfid exp_or_wfid, false + wfid = extract_wfid(exp_or_wfid, false) # 'true' would have made sure that the parent wfid is used... ldebug { "cancel_process() '#{wfid}'" } - root = fetch_root wfid + root = fetch_root(wfid) raise "no process to cancel '#{wfid}'" unless root - cancel root + cancel(root) end alias :cancel_flow :cancel_process # # Forgets the given expression (make it an orphan). @@ -469,37 +313,33 @@ #ldebug { "forget() forgetting #{fei}" } return if not exp - onotify :forget, exp - parent_exp.children.delete(fei) - #exp.parent_id = GONE_PARENT_ID exp.parent_id = nil - exp.dup_environment - exp.store_itself() + exp.store_itself - ldebug { "forget() forgot #{fei}" } + onotify(:forget, exp) + + ldebug { "forget() forgot #{fei}" } end # # Replies to the parent of the given expression. # def reply_to_parent (exp, workitem, remove=true) - ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } - workitem.last_expression_id = exp.fei - onotify :reply_to_parent, exp, workitem + onotify(:reply_to_parent, exp, workitem) if remove - remove exp + remove(exp) # # remove the expression itself exp.clean_children # @@ -507,73 +347,66 @@ end # # manage tag, have to remove it so it can get 'redone' or 'undone' # (preventing abuse) + # + # do the same for the on_error handler if any - tagname = exp.attributes["tag"] if exp.attributes - + tagname = exp.attributes['tag'] exp.delete_variable(tagname) if tagname + #exp.delete_variable(tagname) if tagname and not tagname.match(/^\//) + on_error = exp.attributes['on_error'] #if exp.attributes + exp.delete_variable(on_error) if on_error + # # has raw_expression been updated ? - track_child_raw_representation exp + track_child_raw_representation(exp) # # flow terminated ? if (not exp.parent_id) and (exp.fei.expid == '0') - ldebug do - "reply_to_parent() process " + - "#{exp.fei.workflow_instance_id} terminated" - end + ldebug { "reply_to_parent() process #{exp.fei.wfid} terminated" } - onotify :terminate, exp, workitem + onotify(:terminate, exp, workitem) return end # # else, gone parent ? - if (not exp.parent_id) or (exp.parent_id.expname == 'gone') - # this 'gone' is kept for some level of 'backward compatibility' + #if (not exp.parent_id) or (exp.parent_id.expname == 'gone') + # # this 'gone' is kept for some level of 'backward compatibility' - ldebug do - "reply_to_parent() parent is gone for " + - exp.fei.to_debug_s - end + if (not exp.parent_id) + ldebug { "reply_to_parent() parent is gone for #{exp.fei.to_debug_s}"} return end # # parent still present, reply to it - reply exp.parent_id, workitem + reply(exp.parent_id, workitem) end # # Adds or updates a flow expression in this pool # def update (flow_expression) flow_expression.updated_at = Time.now - ldebug { "update() for #{flow_expression.fei.to_debug_s}" } + #ldebug { "update() for #{flow_expression.fei.to_debug_s}" } - #t = Timer.new + onotify(:update, flow_expression.fei, flow_expression) - onotify :update, flow_expression.fei, flow_expression - - #ldebug do - # "update() took #{t.duration} ms " + - # "#{flow_expression.fei.to_debug_s}" - #end - flow_expression end # # Fetches a FlowExpression from the pool. @@ -581,98 +414,68 @@ # # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch (exp) - #synchronize do - #ldebug { "fetch() exp is of kind #{exp.class}" } + fei = extract_fei(exp) - fei = if exp.is_a?(FlowExpression) - - exp.fei - - elsif not exp.is_a?(FlowExpressionId) - - raise \ - "Cannot fetch expression with key : "+ - "'#{fei}' (#{fei.class})" - - else - - exp - end - - #ldebug { "fetch() for #{fei.to_debug_s}" } - [ get_expression_storage[fei], fei ] - #end end # # Fetches a FlowExpression (returns only the FlowExpression instance) # # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch_expression (exp) - exp, fei = fetch exp - exp + fetch(exp)[0] end # # Returns the engine environment (the top level environment) # def fetch_engine_environment - #synchronize do - # - # synchronize to ensure that there's 1! engine env eei = engine_environment_id - ee, fei = fetch eei + ee, fei = fetch(eei) return ee if ee ee = Environment.new_env( eei, nil, nil, @application_context, nil) ee.store_itself ee - #end end # # Fetches the root expression of a process (or a subprocess). # def fetch_root (wfid) - get_expression_storage.fetch_root wfid + get_expression_storage.fetch_root(wfid) end # # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # def remove (exp) - exp, _fei = fetch(exp) \ - if exp.is_a?(FlowExpressionId) + exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId) return unless exp - ldebug { "remove() fe #{exp.fei.to_debug_s}" } + #ldebug { "remove() fe #{exp.fei.to_debug_s}" } - onotify :remove, exp.fei + onotify(:remove, exp.fei) - #synchronize do - #@monitors.delete(exp.fei) - - remove_environment(exp.environment_id) \ - if exp.owns_its_environment? - #end + remove_environment(exp.environment_id) if exp.owns_its_environment? end # # This method is called at each expool (engine) [re]start. # It roams through the previously saved (persisted) expressions @@ -682,21 +485,21 @@ return if @stopped t = OpenWFE::Timer.new - linfo { "reschedule() initiating..." } + linfo { 'reschedule() initiating...' } options = { :include_classes => Rufus::Schedulable } get_expression_storage.find_expressions(options).each do |fexp| linfo { "reschedule() for #{fexp.fei.to_s}..." } - onotify :reschedule, fexp.fei + onotify(:reschedule, fexp.fei) - fexp.reschedule get_scheduler + fexp.reschedule(get_scheduler) end linfo { "reschedule() done. (took #{t.duration} ms)" } end @@ -704,117 +507,83 @@ # Returns the unique engine_environment FlowExpressionId instance. # There is only one such environment in an engine, hence this # 'singleton' method. # def engine_environment_id - #synchronize do - # no need, it's been already called at initialization - return @eei if @eei - - @eei = FlowExpressionId.new - @eei.owfe_version = OPENWFERU_VERSION - @eei.engine_id = get_engine.engine_name - @eei.initial_engine_id = @eei.engine_id - @eei.workflow_definition_url = 'ee' - @eei.workflow_definition_name = 'ee' - @eei.workflow_definition_revision = '0' - @eei.workflow_instance_id = '0' - @eei.expression_name = EN_ENVIRONMENT - @eei.expression_id = '0' - @eei - #end + @eei ||= new_fei( + :workflow_definition_url => 'ee', + :workflow_definition_name => 'ee', + :workflow_instance_id => '0', + :expression_name => EN_ENVIRONMENT) end - # + #-- # Returns the list of applied expressions belonging to a given # workflow instance. # # If the unapplied optional parameter is set to true, all the # expressions (even those not yet applied) that compose the process # instance will be returned. Environments will be returned as well. # - def process_stack (wfid, unapplied=false) + #def process_stack (wfid) + # wfid = extract_wfid(wfid, true) + # params = { :parent_wfid => wfid } + # stack = get_expression_storage.find_expressions(params) + # stack.extend(RepresentationMixin) + # stack + #end + #++ - #raise "please provide a non-nil workflow instance id" \ - # unless wfid - - wfid = extract_wfid wfid, true - - params = { - #:exclude_classes => [ Environment, RawExpression ], - #:exclude_classes => [ Environment ], - :parent_wfid => wfid - } - params[:applied] = true if (not unapplied) - - stack = get_expression_storage.find_expressions params - - stack.extend(RepresentationMixin) if unapplied - - stack - end - - # + #-- # Lists all workflows (processes) currently in the expool (in # the engine). # This method will return a list of "process-definition" expressions # (root of flows). # - def list_processes (options={}) + #def list_processes (options={}) + # options[:include_classes] = DefineExpression + # # + # # Maybe it would be better to list root expressions instead + # # so that expressions like 'sequence' can be used + # # as root expressions. Later... + # get_expression_storage.find_expressions(options) + #end + #++ - options[:include_classes] = DefineExpression - # - # Maybe it would be better to list root expressions instead - # so that expressions like 'sequence' can be used - # as root expressions. Later... - - get_expression_storage.find_expressions options - end - # # This method is called when apply() or reply() failed for # an expression. # There are currently only two 'users', the ParticipantExpression # class and the do_process_workelement method of this ExpressionPool # class. # - def notify_error (error, fei, message, workitem) + # Error handling is done here, if no handler was found, the error simply + # generate a notification (generally caught by an error journal). + # + def handle_error (error, fei, message, workitem) - fei = extract_fei fei - # densha requires that... :( + fei = extract_fei(fei) # just to be sure - se = OpenWFE::exception_to_s error - - #onotify :error, fei, message, workitem, error.class.name, se - onotify(:error, fei, message, workitem, error.class.name, error.to_s) - if error.is_a?(PausedError) lwarn do "#{self.service_name} " + "operation :#{message.to_s} on #{fei.to_s} " + "delayed because process '#{fei.wfid}' is in pause" end else lwarn do "#{self.service_name} " + "operation :#{message.to_s} on #{fei.to_s} " + - "failed with\n" + se + "failed with\n" + OpenWFE::exception_to_s(error) end end - end - # - # Gets the process definition (if necessary) and turns into - # into an expression tree (for storing into a RawExpression). - # - def determine_rep (param) + # notify or really handle ? - param = read_uri(param) if param.is_a?(URI) - - #DefParser.parse param - get_def_parser.parse param + do_handle_error(fei, workitem) || + onotify(:error, fei, message, workitem, error.class.name, error.to_s) end # # Returns true if the process instance to which the expression # belongs is currently paused. @@ -822,316 +591,351 @@ def is_paused? (expression) (@paused_instances[expression.fei.parent_wfid] != nil) end - protected + # + # Builds the RawExpression instance at the root of the flow + # being launched. + # + # The param can be a template or a definition (or a URI). + # + def build_raw_expression (param, launchitem=nil) - # - # If the launch option :wait_for is set to true, this method - # will be called to apply the raw_expression. It will only return - # when the launched process is over, which means it terminated, it - # had an error or it got cancelled. - # - def wait_for (fei_or_wfid) + procdef = get_def_parser.determine_rep(param) - wfid = extract_wfid fei_or_wfid, false + # procdef is a nested [ name, attributes, children ] structure now - t = Thread.current - result = nil + atts = procdef[1] - to = get_expression_pool.add_observer(:terminate) do |c, fe, wi| - if fe.fei.workflow_instance_id == wfid - result = [ :terminate, wi, fei_or_wfid ] - t.wakeup - end + h = { + :workflow_instance_id => + get_wfid_generator.generate(launchitem), + :workflow_definition_name => + atts['name'] || procdef[2].first || 'no-name', + :workflow_definition_revision => + atts['revision'] || '0', + :expression_name => + procdef[0] + } + + h[:workflow_definition_url] = ( + launchitem.workflow_definition_url || LaunchItem::FIELD_DEF + ) if launchitem + + RawExpression.new_raw( + new_fei(h), nil, nil, @application_context, procdef) + end + + # + # If the launch option :wait_for is set to true, this method + # will be called to apply the raw_expression. It will only return + # when the launched process is over, which means it terminated, it + # had an error or it got cancelled. + # + def wait_for (fei_or_wfid) + + wfid = extract_wfid(fei_or_wfid, false) + + t = Thread.current + result = nil + + to = add_observer(:terminate) do |c, fe, wi| + if fe.fei.wfid == wfid + result = [ :terminate, wi, fei_or_wfid ] + t.wakeup end - te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| - if fei.parent_wfid == wfid - result = [ :error, e, fei_or_wfid ] - t.wakeup - end + end + te = add_observer(:error) do |c, fei, m, i, e| + if fei.parent_wfid == wfid + result = [ :error, e, fei_or_wfid ] + t.wakeup end - tc = get_expression_pool.add_observer(:cancel) do |c, fe| - if fe.fei.wfid == wfid and fe.fei.expid == '0' - result = [ :cancel, wfid, fei_or_wfid ] - t.wakeup - end + end + tc = add_observer(:cancel) do |c, fe| + if fe.fei.wfid == wfid and fe.fei.expid == '0' + result = [ :cancel, wfid, fei_or_wfid ] + t.wakeup end + end - #apply raw_expression, wi - yield if block_given? + yield if block_given? - Thread.stop unless result + Thread.stop unless result - linfo { "wait_for() '#{wfid}' is over" } + linfo { "wait_for() '#{wfid}' is over" } - get_expression_pool.remove_observer to, :terminate - get_expression_pool.remove_observer te, :error - get_expression_pool.remove_observer tc, :cancel + remove_observer(to, :terminate) + remove_observer(te, :error) + remove_observer(tc, :cancel) - result - end + result + end - # - # This is the only point in the expression pool where an URI - # is read, so this is where the :remote_definitions_allowed - # security check is enforced. - # - def read_uri (uri) + protected - uri = URI.parse uri.to_s + # + # Checks if there is an event handler available + # + def do_handle_error (fei, workitem) - raise ":remote_definitions_allowed is set to false" \ - if (ac[:remote_definitions_allowed] != true and - uri.scheme and - uri.scheme != 'file') + fexp = fetch_expression(fei) - #open(uri.to_s).read + eh_stack = fexp.lookup_variable_stack('error_handlers') - f = Rufus::Verbs.fopen uri - result = f.read - f.close if f.respond_to?(:close) + return false if eh_stack.empty? - result - end + eh_stack.each do |env, ehandlers| + ehandlers.reverse.each do |ehandler| - # - # This is the method called [asynchronously] by the WorkQueue - # upon apply/reply. - # - def do_apply_reply (direction, exp_or_fei, workitem) + fei, on_error = ehandler - fei = nil + next unless fexp.descendant_of?(fei) - begin + return false if on_error == '' + # + # blanking the 'on_error' makes the block behave like if there + # were no error handler at all (error is then passed to error + # journal usually (if there is one listening)) - exp, fei = if exp_or_fei.is_a?(FlowExpressionId) - fetch exp_or_fei - else - [ exp_or_fei, exp_or_fei.fei ] - end + tryexp = fetch_expression(fei) - #p [ direction, fei.wfid, fei.expid, fei.expname ] - # - # I uncomment that sometimes to see how the stack - # grows (wfids and expids) + # remove error handler before consuming it - ldebug { - ":#{direction} "+ - "target #{fei.to_debug_s}" } + ehandlers.delete(ehandler) + env.store_itself - if not exp + # fetch on_error template - #raise "apply() cannot apply missing #{_fei.to_debug_s}" - # not very helpful anyway + template = (on_error == 'redo') ? + tryexp.raw_representation : + tryexp.lookup_variable(on_error) || [ on_error, {}, [] ] - lwarn { "do_apply_reply() cannot find >#{fei}" } + # cancel block that is adorned with 'on_error' - return - end + environment = tryexp.owns_its_environment? ? + tryexp.get_environment : nil - check_if_paused exp + cancel(tryexp) - workitem.fei = exp.fei if direction == :apply + ldebug { "do_handle_error() on_error : '#{on_error}'" } - onotify direction, exp, workitem + if on_error == 'undo' + # + # block with 'undo' error handler simply gets undone in case of + # error + # + reply_to_parent(tryexp, workitem, false) + return true + end - exp.send direction, workitem + # switch to error handling subprocess - rescue Exception => e + environment.store_itself if environment + # + # the point of error had variables, make sure they are available + # to the error handling block. - notify_error e, fei, direction, workitem + substitute_and_apply(tryexp, template, workitem) + + return true end end - # - # Will raise an exception if the expression belongs to a paused - # process. - # - def check_if_paused (expression) + false # no error handler found + end - wfid = expression.fei.parent_wfid + # + # This is the method called [asynchronously] by the WorkQueue + # upon apply/reply. + # + def do_apply_reply (direction, exp_or_fei, workitem) - raise PausedError.new(wfid) if @paused_instances[wfid] - end + fei = nil - # - # if the launch method is called with a schedule option - # (like :at, :in, :cron and :every), this method takes care of - # wrapping the process with a sleep or a cron. - # - def wrap_in_schedule (raw_expression, options) + begin - oat = options[:at] - oin = options[:in] - ocron = options[:cron] - oevery = options[:every] + exp, fei = if exp_or_fei.is_a?(FlowExpressionId) + fetch(exp_or_fei) + else + [ exp_or_fei, exp_or_fei.fei ] + end - fei = new_fei nil, "schedlaunch", "0", "sequence" + #p [ direction, fei.wfid, fei.expid, fei.expname ] + # + # I uncomment that sometimes to see how the stack + # grows (wfids and expids) - # not very happy with this code, it builds custom - # wrapping processes manually, maybe there is - # a more elegant way, but for now, it's ok. + if not exp - template = if oat or oin + #raise "apply() cannot apply missing #{_fei.to_debug_s}" + # not very helpful anyway - sleep_atts = if oat - { "until" => oat } - else #oin - { "for" => oin } - end - sleep_atts["scheduler-tags"] = "scheduled-launch, #{fei.wfid}" + lwarn { "do_apply_reply() :#{direction} but cannot find #{fei}" } - raw_expression.new_environment - raw_expression.store_itself + return + end - [ - "sequence", {}, [ - [ "sleep", sleep_atts, [] ], - raw_expression.fei - ] - ] + check_if_paused(exp) - elsif ocron or oevery + workitem.fei = exp.fei if direction == :apply - fei.expression_name = "cron" + onotify(direction, exp, workitem) - cron_atts = if ocron - { "tab" => ocron } - else #oevery - { "every" => oevery } - end - cron_atts["name"] = "//cron_launch__#{fei.wfid}" - cron_atts["scheduler-tags"] = "scheduled-launch, #{fei.wfid}" + exp.send(direction, workitem) - template = raw_expression.raw_representation - remove raw_expression + rescue Exception => e - [ "cron", cron_atts, [ template ] ] + handle_error(e, fei, direction, workitem) + end + end - else + # + # Will raise an exception if the expression belongs to a paused + # process. + # + def check_if_paused (expression) - nil # don't schedule at all - end + wfid = expression.fei.parent_wfid - if template + raise PausedError.new(wfid) if @paused_instances[wfid] + end - raw_exp = RawExpression.new_raw( - fei, nil, nil, @application_context, template) + # + # if the launch method is called with a schedule option + # (like :at, :in, :cron and :every), this method takes care of + # wrapping the process with a sleep or a cron. + # + def wrap_in_schedule (raw_expression, options) - raw_exp.store_itself + oat = options[:at] + oin = options[:in] + ocron = options[:cron] + oevery = options[:every] - raw_exp - else + fei = new_fei( + :workflow_instance_id => get_wfid_generator.generate(nil), + :workflow_definition_name => 'schedlaunch', + :expression_name => 'sequence') - raw_expression + # not very happy with this code, it builds custom + # wrapping processes manually, maybe there is + # a more elegant way, but for now, it's ok. + + template = if oat or oin + + sleep_atts = if oat + { 'until' => oat } + else #oin + { 'for' => oin } end - end + sleep_atts['scheduler-tags'] = "scheduled-launch, #{fei.wfid}" - # - # Removes an environment, especially takes care of unbinding - # any special value it may contain. - # - def remove_environment (environment_id) + raw_expression.new_environment + raw_expression.store_itself - ldebug { "remove_environment() #{environment_id.to_debug_s}" } + [ + 'sequence', {}, [ + [ 'sleep', sleep_atts, [] ], + raw_expression.fei + ] + ] - env, fei = fetch(environment_id) + elsif ocron or oevery - return unless env - # - # env already unbound and removed + fei.expression_name = 'cron' - env.unbind + cron_atts = if ocron + { 'tab' => ocron } + else #oevery + { 'every' => oevery } + end + cron_atts['name'] = "//cron_launch__#{fei.wfid}" + cron_atts['scheduler-tags'] = "scheduled-launch, #{fei.wfid}" - #get_expression_storage().delete(environment_id) + template = raw_expression.raw_representation + remove(raw_expression) - onotify :remove, environment_id + [ 'cron', cron_atts, [ template ] ] + + else + + nil # don't schedule at all end - # - # Prepares a new instance of InFlowWorkItem from a LaunchItem - # instance. - # - def build_workitem (launchitem) + if template - wi = InFlowWorkItem.new + raw_exp = RawExpression.new_raw( + fei, nil, nil, @application_context, template) - wi.attributes = launchitem.attributes.dup + #raw_exp.store_itself + raw_exp.new_environment - wi + raw_exp + else + + raw_expression end + end - # - # Builds a FlowExpressionId instance for a process being - # launched. - # - def new_fei (launchitem, flow_name, flow_revision, exp_name) + # + # Removes an environment, especially takes care of unbinding + # any special value it may contain. + # + def remove_environment (environment_id) - url = if launchitem - launchitem.workflow_definition_url - else - "no-url" - end + #ldebug { "remove_environment() #{environment_id.to_debug_s}" } - fei = FlowExpressionId.new + env, fei = fetch(environment_id) - fei.owfe_version = OPENWFERU_VERSION - fei.engine_id = OpenWFE::stu get_engine.service_name.to_s - fei.initial_engine_id = OpenWFE::stu fei.engine_id - fei.workflow_definition_url = OpenWFE::stu url - fei.workflow_definition_name = OpenWFE::stu flow_name - fei.workflow_definition_revision = OpenWFE::stu flow_revision - fei.wfid = get_wfid_generator.generate launchitem - fei.expression_id = "0" - fei.expression_name = exp_name + return unless env + # + # env already unbound and removed - fei - end + env.unbind - # - # Builds the RawExpression instance at the root of the flow - # being launched. - # - # The param can be a template or a definition (anything - # accepted by the determine_representation() method). - # - def build_raw_expression (launchitem, param) + onotify(:remove, environment_id) + end - procdef = determine_rep param + # + # Builds a FlowExpressionId instance for a process being + # launched. + # + def new_fei (h) - atts = procdef[1] - flow_name = atts['name'] || "noname" - flow_revision = atts['revision'] || "0" - exp_name = procdef.first + h[:engine_id] = OpenWFE::stu(get_engine.engine_name) - fei = new_fei launchitem, flow_name, flow_revision, exp_name + %w{ url name revision }.each { |k| stu(h, k) } - RawExpression.new_raw( - fei, nil, nil, @application_context, procdef) - end + FlowExpressionId.new_fei(h) + end - # - # Given a [replying] child flow expression, will update its parent - # raw expression if the child raw_expression changed. - # - # This is used to keep track of in-flight modification to running - # process instances. - # - def track_child_raw_representation (fexp) + def stu (h, key) - return unless fexp.raw_rep_updated == true + key = "workflow_definition_#{key}".intern + v = h[key] + h[key] = OpenWFE::stu(v.to_s) if v + end - parent = fetch_expression fexp.parent_id + # + # Given a [replying] child flow expression, will update its parent + # raw expression if the child raw_expression changed. + # + # This is used to keep track of in-flight modification to running + # process instances. + # + def track_child_raw_representation (fexp) - return if parent.class.uses_template? + return unless fexp.raw_rep_updated == true - parent.raw_children[fexp.fei.child_id.to_i] = - fexp.raw_representation + parent = fetch_expression(fexp.parent_id) - parent.store_itself - end + #p [ :storing, fexp.raw_representation, fexp.fei.to_short_s ] + + parent.raw_children[fexp.fei.child_id.to_i] = fexp.raw_representation + + parent.store_itself + end end end