lib/openwfe/expool/expressionpool.rb in ruote-0.9.18 vs lib/openwfe/expool/expressionpool.rb in ruote-0.9.19

- old
+ new

@@ -46,1099 +46,1092 @@ require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/observable' require 'openwfe/expool/parser' require 'openwfe/expool/representation' +require 'openwfe/expool/paused_error' +require 'openwfe/expool/expool_pause_methods' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw' -require 'rufus/lru' # gem 'rufus-lru' require 'rufus/verbs' # gem 'rufus-lru' module OpenWFE - # - # The ExpressionPool stores expressions (pieces of workflow instance). - # It's the core of the workflow engine. - # It relies on an expression storage for actual persistence of the - # expressions. - # - class ExpressionPool - include ServiceMixin - include OwfeServiceLocator - include OwfeObservable - include FeiMixin + # + # The ExpressionPool stores expressions (pieces of workflow instance). + # It's the core of the workflow engine. + # It relies on an expression storage for actual persistence of the + # expressions. + # + class ExpressionPool - # - # The hash containing the wfid of the process instances currently - # paused. - # - attr_reader :paused_instances + include ServiceMixin + include OwfeServiceLocator + include OwfeObservable + include FeiMixin - # - # The constructor for the expression pool. - # - def initialize (service_name, application_context) + include ExpoolPauseMethods - super() + # + # The hash containing the wfid of the process instances currently + # paused (a cache). + # + attr_reader :paused_instances - service_init service_name, application_context + # + # The constructor for the expression pool. + # + def initialize (service_name, application_context) - @paused_instances = {} + super() - #@monitors = MonitorProvider.new(application_context) + service_init service_name, application_context - @observers = {} + @paused_instances = {} - @stopped = false + #@monitors = MonitorProvider.new(application_context) - engine_environment_id - # makes sure it's called now - end + @observers = {} - # - # Stops this expression pool (especially its workqueue). - # - def stop + @stopped = false - @stopped = true + engine_environment_id + # makes sure it's called now + end - onotify :stop - end + # + # Stops this expression pool (especially its workqueue). + # + def stop - #-- - # Obtains a unique monitor for an expression. - # It avoids the need for the FlowExpression instances to include - # the monitor mixin by themselves - # - #def get_monitor (fei) - # @monitors[fei] - #end - #++ + @stopped = true - # - # This method is called by the launch method. It's actually the first - # stage of that method. - # It may be interessant to use to 'validate' a launchitem and its - # process definition, as it will raise an exception in case - # of 'parameter' mismatch. - # - # There is a 'pre_launch_check' alias for this method in the - # Engine class. - # - def prepare_raw_expression (launchitem) + onotify :stop + end - wfdurl = launchitem.workflow_definition_url + #-- + # Obtains a unique monitor for an expression. + # It avoids the need for the FlowExpression instances to include + # the monitor mixin by themselves + # + #def get_monitor (fei) + # @monitors[fei] + #end + #++ - raise "launchitem.workflow_definition_url not set, cannot launch" \ - unless wfdurl + # + # This method is called by the launch method. It's actually the first + # stage of that method. + # It may be interessant to use to 'validate' a launchitem and its + # process definition, as it will raise an exception in case + # of 'parameter' mismatch. + # + # There is a 'pre_launch_check' alias for this method in the + # Engine class. + # + def prepare_raw_expression (launchitem) - definition = if wfdurl.match "^field:" + wfdurl = launchitem.workflow_definition_url - raise( - ":definition_in_launchitem_allowed not set to true, "+ - "cannot launch" - ) if ac[:definition_in_launchitem_allowed] != true + raise "launchitem.workflow_definition_url not set, cannot launch" \ + unless wfdurl - wfdfield = wfdurl[6..-1] - launchitem.attributes.delete wfdfield - else + definition = if wfdurl.match "^field:" - read_uri wfdurl - end + raise( + ":definition_in_launchitem_allowed not set to true, "+ + "cannot launch" + ) if ac[:definition_in_launchitem_allowed] != true - raise "didn't find process definition at '#{wfdurl}'" \ - unless definition + wfdfield = wfdurl[6..-1] + launchitem.attributes.delete wfdfield + else - raw_expression = build_raw_expression launchitem, definition + read_uri wfdurl + end - raw_expression.check_parameters launchitem - # - # will raise an exception if there are requirements - # and one of them is not met + raise "didn't find process definition at '#{wfdurl}'" \ + unless definition - raw_expression - end + raw_expression = build_raw_expression launchitem, definition + raw_expression.check_parameters launchitem # - # Instantiates a workflow definition and launches it. - # - # This method call will return immediately, it could even return - # before the actual launch is completely over. - # - # Returns the FlowExpressionId instance of the root expression of - # the newly launched flow. - # - def launch (launchitem, options={}) + # will raise an exception if there are requirements + # and one of them is not met - # - # prepare raw expression + raw_expression + end - raw_expression = prepare_raw_expression launchitem - # - # will raise an exception if there are requirements - # and one of them is not met + # + # Instantiates a workflow definition and launches it. + # + # This method call will return immediately, it could even return + # before the actual launch is completely over. + # + # Returns the FlowExpressionId instance of the root expression of + # the newly launched flow. + # + def launch (launchitem, options={}) - raw_expression = wrap_in_schedule(raw_expression, options) \ - if options.size > 0 + wait = (options[:wait_for] == true) - raw_expression.new_environment - # - # as this expression is the root of a new process instance, - # it has to have an environment for all the variables of - # the process instance + # + # prepare raw expression - fei = raw_expression.fei + raw_expression = prepare_raw_expression launchitem + # + # will raise an exception if there are requirements + # and one of them is not met - # - # apply prepared raw expression + raw_expression = wrap_in_schedule(raw_expression, options) \ + if options.size > 0 - wi = build_workitem launchitem + raw_expression.new_environment + # + # as this expression is the root of a new process instance, + # it has to have an environment for all the variables of + # the process instance - onotify :launch, fei, launchitem + fei = raw_expression.fei - apply raw_expression, wi + # + # apply prepared raw expression - fei - end + wi = build_workitem launchitem - # - # This is the first stage of the tlaunch_child() method. - # - # (it's used by the concurrent iterator when preparing all its - # iteration children) - # - def tprepare_child ( - parent_exp, template, sub_id, register_child, vars) + onotify :launch, fei, launchitem - return fetch_expression(template) \ - if template.is_a?(FlowExpressionId) + if wait + wait_for(fei) { apply raw_expression, wi } + else + apply raw_expression, wi + fei + end + end - fei = parent_exp.fei.dup - fei.expression_name = template.first - fei.expression_id = "#{fei.expid}.#{sub_id}" + # + # This is the first stage of the tlaunch_child() method. + # + # (it's used by the concurrent iterator when preparing all its + # iteration children) + # + def tprepare_child ( + parent_exp, template, sub_id, register_child, vars) - raw_exp = RawExpression.new_raw( - fei, nil, nil, @application_context, template) + return fetch_expression(template) \ + if template.is_a?(FlowExpressionId) - raw_exp.parent_id = parent_exp.fei + fei = parent_exp.fei.dup + fei.expression_name = template.first + fei.expression_id = "#{fei.expid}.#{sub_id}" - if vars - raw_exp.new_environment vars - else - raw_exp.environment_id = parent_exp.environment_id - end + raw_exp = RawExpression.new_raw( + fei, nil, nil, @application_context, template) - #workitem.fei = raw_exp.fei - # done in do_apply... + raw_exp.parent_id = parent_exp.fei - if register_child - (parent_exp.children ||= []) << raw_exp.fei - update raw_exp - end + if vars + raw_exp.new_environment vars + else + raw_exp.environment_id = parent_exp.environment_id + end - raw_exp - end + #workitem.fei = raw_exp.fei + # done in do_apply... - # - # Launches the given template (sexp) as the child of its - # parent expression. - # - # If the last, register_child, is set to true, this method will - # take care of adding the new child to the parent expression. - # - # (used by 'cron' and more) - # - def tlaunch_child ( - parent_exp, template, sub_id, workitem, register_child, vars=nil) + if register_child + (parent_exp.children ||= []) << raw_exp.fei + update raw_exp + end - raw_exp = tprepare_child( - parent_exp, template, sub_id, register_child, vars) + raw_exp + end - onotify :tlaunch_child, raw_exp.fei, workitem + # + # Launches the given template (sexp) as the child of its + # parent expression. + # + # If the last, register_child, is set to true, this method will + # take care of adding the new child to the parent expression. + # + # (used by 'cron' and more) + # + def tlaunch_child ( + parent_exp, template, sub_id, workitem, register_child, vars=nil) - apply raw_exp, workitem + raw_exp = tprepare_child( + parent_exp, template, sub_id, register_child, vars) - raw_exp.fei - end + onotify :tlaunch_child, raw_exp.fei, workitem - # - # Launches a template, but makes sure the new expression has no - # parent. - # - # (used by 'listen') - # - def tlaunch_orphan ( - firing_exp, template, sub_id, workitem, register_child) + apply raw_exp, workitem - fei = firing_exp.fei.dup - fei.expression_id = "#{fei.expid}.#{sub_id}" - fei.expression_name = template.first + raw_exp.fei + end - raw_exp = RawExpression.new_raw( - fei, nil, nil, @application_context, template) + # + # Launches a template, but makes sure the new expression has no + # parent. + # + # (used by 'listen') + # + def tlaunch_orphan ( + firing_exp, template, sub_id, workitem, register_child) - #raw_exp.parent_id = GONE_PARENT_ID - raw_exp.parent_id = nil - # it's an orphan, no parent + fei = firing_exp.fei.dup + fei.expression_id = "#{fei.expid}.#{sub_id}" + fei.expression_name = template.first - raw_exp.environment_id = firing_exp.environment_id - # tapping anyway into the firer's environment + raw_exp = RawExpression.new_raw( + fei, nil, nil, @application_context, template) - (firing_exp.children ||= []) << raw_exp.fei \ - if register_child + #raw_exp.parent_id = GONE_PARENT_ID + raw_exp.parent_id = nil + # it's an orphan, no parent - onotify :tlaunch_orphan, raw_exp.fei, workitem + raw_exp.environment_id = firing_exp.environment_id + # tapping anyway into the firer's environment - apply raw_exp, workitem + (firing_exp.children ||= []) << raw_exp.fei \ + if register_child - raw_exp.fei - end + onotify :tlaunch_orphan, raw_exp.fei, workitem - # - # Launches a subprocess. - # The resulting wfid is a subid for the wfid of the firing expression. - # - # (used by 'subprocess') - # - def launch_subprocess ( - firing_exp, template, forget, workitem, params) + apply raw_exp, workitem - raw_exp = if template.is_a?(FlowExpressionId) + raw_exp.fei + end - fetch_expression template + # + # Launches a subprocess. + # The resulting wfid is a subid for the wfid of the firing expression. + # + # (used by 'subprocess') + # + def launch_subprocess ( + firing_exp, template, forget, workitem, params) - elsif template.is_a?(RawExpression) + raw_exp = if template.is_a?(FlowExpressionId) - template.application_context = @application_context - template + fetch_expression template - else # probably an URI + elsif template.is_a?(RawExpression) - build_raw_expression nil, template - end + template.application_context = @application_context + template - raw_exp = raw_exp.dup - raw_exp.fei = raw_exp.fei.dup + else # probably an URI - if forget - raw_exp.parent_id = nil - else - raw_exp.parent_id = firing_exp.fei - end + build_raw_expression nil, template + end - #raw_exp.fei.wfid = get_wfid_generator.generate - #raw_exp.fei.wfid = - # "#{firing_exp.fei.wfid}.#{firing_exp.get_next_sub_id}" - raw_exp.fei.wfid = - "#{firing_exp.fei.parent_wfid}.#{firing_exp.get_next_sub_id}" + raw_exp = raw_exp.dup + raw_exp.fei = raw_exp.fei.dup - raw_exp.new_environment params + if forget + raw_exp.parent_id = nil + else + raw_exp.parent_id = firing_exp.fei + end - raw_exp.store_itself + #raw_exp.fei.wfid = get_wfid_generator.generate + #raw_exp.fei.wfid = + # "#{firing_exp.fei.wfid}.#{firing_exp.get_next_sub_id}" + raw_exp.fei.wfid = + "#{firing_exp.fei.parent_wfid}.#{firing_exp.get_next_sub_id}" - apply raw_exp, workitem + raw_exp.new_environment params - raw_exp.fei - end + raw_exp.store_itself - # - # Replaces the flow expression with a raw expression that has - # the same fei, same parent and points to the same env. - # The raw_representation will be the template. - # Stores and then apply the "cuckoo" expression. - # - def substitute_and_apply (fexp, template, workitem) + apply raw_exp, workitem - re = RawExpression.new_raw( - fexp.fei, - fexp.parent_id, - fexp.environment_id, - application_context, - template) + raw_exp.fei + end - update re + # + # Replaces the flow expression with a raw expression that has + # the same fei, same parent and points to the same env. + # The raw_representation will be the template. + # Stores and then apply the "cuckoo" expression. + # + def substitute_and_apply (fexp, template, workitem) - apply re, workitem - end + re = RawExpression.new_raw( + fexp.fei, + fexp.parent_id, + fexp.environment_id, + application_context, + template) - # - # Applies a given expression (id or expression) - # - def apply (exp_or_fei, workitem) + update re - get_workqueue.push( - self, :do_apply_reply, :apply, exp_or_fei, workitem) - end + apply re, workitem + end - # - # Replies to a given expression - # - def reply (exp_or_fei, workitem) + # + # Applies a given expression (id or expression) + # + def apply (exp_or_fei, workitem) - get_workqueue.push( - self, :do_apply_reply, :reply, exp_or_fei, workitem) - end + get_workqueue.push( + self, :do_apply_reply, :apply, exp_or_fei, workitem) + end - # - # Cancels the given expression. - # The param might be an expression instance or a FlowExpressionId - # instance. - # - def cancel (exp) + # + # Replies to a given expression + # + def reply (exp_or_fei, workitem) - exp, fei = fetch exp + get_workqueue.push( + self, :do_apply_reply, :reply, exp_or_fei, workitem) + end - unless exp - linfo { "cancel() cannot cancel missing #{fei.to_debug_s}" } - return nil - end + # + # Cancels the given expression. + # The param might be an expression instance or a FlowExpressionId + # instance. + # + def cancel (exp) - ldebug { "cancel() for #{fei.to_debug_s}" } + exp, fei = fetch exp - onotify :cancel, exp + unless exp + linfo { "cancel() cannot cancel missing #{fei.to_debug_s}" } + return nil + end - wi = exp.cancel + ldebug { "cancel() for #{fei.to_debug_s}" } - remove exp + onotify :cancel, exp - wi - end + wi = exp.cancel - # - # Cancels the given expression and makes sure to resume the flow - # if the expression or one of its children were active. - # - # If the cancelled branch was not active, this method will take - # care of removing the cancelled expression from the parent - # expression. - # - def cancel_expression (exp) + remove exp - exp = fetch_expression exp + wi + end - wi = cancel exp + # + # Cancels the given expression and makes sure to resume the flow + # if the expression or one of its children were active. + # + # If the cancelled branch was not active, this method will take + # care of removing the cancelled expression from the parent + # expression. + # + def cancel_expression (exp) - # ( remember that in case of error, no wi could get returned...) + exp = fetch_expression exp - if wi + wi = cancel exp - reply_to_parent exp, wi, false + # ( remember that in case of error, no wi could get returned...) - elsif exp.parent_id + if wi - parent_exp = fetch_expression exp.parent_id - parent_exp.remove_child(exp.fei) if parent_exp - end - end + reply_to_parent exp, wi, false - # - # Given any expression of a process, cancels the complete process - # instance. - # - def cancel_process (exp_or_wfid) + elsif exp.parent_id - wfid = extract_wfid exp_or_wfid, false + parent_exp = fetch_expression exp.parent_id + parent_exp.remove_child(exp.fei) if parent_exp + end + end - ldebug { "cancel_process() '#{wfid}'" } + # + # Given any expression of a process, cancels the complete process + # instance. + # + def cancel_process (exp_or_wfid) - root = fetch_root wfid + wfid = extract_wfid exp_or_wfid, false + # 'true' would have made sure that the parent wfid is used... - raise "no process to cancel '#{wfid}'" unless root + ldebug { "cancel_process() '#{wfid}'" } - cancel root - end - alias :cancel_flow :cancel_process - # - # Forgets the given expression (make it an orphan). - # - def forget (parent_exp, exp) + root = fetch_root wfid - exp, fei = fetch exp + raise "no process to cancel '#{wfid}'" unless root - #ldebug { "forget() forgetting #{fei}" } + cancel root + end + alias :cancel_flow :cancel_process - return if not exp + # + # Forgets the given expression (make it an orphan). + # + def forget (parent_exp, exp) - onotify :forget, exp + exp, fei = fetch exp - parent_exp.children.delete(fei) + #ldebug { "forget() forgetting #{fei}" } - #exp.parent_id = GONE_PARENT_ID - exp.parent_id = nil + return if not exp - exp.dup_environment - exp.store_itself() + onotify :forget, exp - ldebug { "forget() forgot #{fei}" } - end + parent_exp.children.delete(fei) - # - # Replies to the parent of the given expression. - # - def reply_to_parent (exp, workitem, remove=true) + #exp.parent_id = GONE_PARENT_ID + exp.parent_id = nil - ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } + exp.dup_environment + exp.store_itself() - workitem.last_expression_id = exp.fei + ldebug { "forget() forgot #{fei}" } + end - onotify :reply_to_parent, exp, workitem + # + # Replies to the parent of the given expression. + # + def reply_to_parent (exp, workitem, remove=true) - if remove + ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } - remove exp - # - # remove the expression itself + workitem.last_expression_id = exp.fei - exp.clean_children - # - # remove all the children of the expression - end + onotify :reply_to_parent, exp, workitem - # - # manage tag, have to remove it so it can get 'redone' or 'undone' - # (preventing abuse) + if remove - tagname = exp.attributes["tag"] if exp.attributes + remove exp + # + # remove the expression itself - exp.delete_variable(tagname) if tagname + exp.clean_children + # + # remove all the children of the expression + end - # - # has raw_expression been updated ? + # + # manage tag, have to remove it so it can get 'redone' or 'undone' + # (preventing abuse) - track_child_raw_representation exp + tagname = exp.attributes["tag"] if exp.attributes - # - # flow terminated ? + exp.delete_variable(tagname) if tagname - #if not exp.parent_id - if (not exp.parent_id) and (exp.fei.expid == '0') + # + # has raw_expression been updated ? - ldebug do - "reply_to_parent() process " + - "#{exp.fei.workflow_instance_id} terminated" - end + track_child_raw_representation exp - onotify :terminate, exp, workitem + # + # flow terminated ? - return - end + if (not exp.parent_id) and (exp.fei.expid == '0') - # - # else, gone parent ? + ldebug do + "reply_to_parent() process " + + "#{exp.fei.workflow_instance_id} terminated" + end - #if exp.parent_id == GONE_PARENT_ID - if (not exp.parent_id) or (exp.parent_id.expname == 'gone') - # this 'gone' is kept for some level of 'backward compatibility' + onotify :terminate, exp, workitem - ldebug do - "reply_to_parent() parent is gone for " + - exp.fei.to_debug_s - end + return + end - return - end + # + # else, gone parent ? - # - # parent still present, reply to it + if (not exp.parent_id) or (exp.parent_id.expname == 'gone') + # this 'gone' is kept for some level of 'backward compatibility' - reply exp.parent_id, workitem + ldebug do + "reply_to_parent() parent is gone for " + + exp.fei.to_debug_s end - # - # Adds or updates a flow expression in this pool - # - def update (flow_expression) + return + end - ldebug { "update() for #{flow_expression.fei.to_debug_s}" } + # + # parent still present, reply to it - #t = Timer.new + reply exp.parent_id, workitem + end - onotify :update, flow_expression.fei, flow_expression + # + # Adds or updates a flow expression in this pool + # + def update (flow_expression) - #ldebug do - # "update() took #{t.duration} ms " + - # "#{flow_expression.fei.to_debug_s}" - #end + flow_expression.updated_at = Time.now - flow_expression - end + ldebug { "update() for #{flow_expression.fei.to_debug_s}" } - # - # Fetches a FlowExpression from the pool. - # Returns a tuple : the FlowExpression plus its FlowExpressionId. - # - # The param 'exp' may be a FlowExpressionId or a FlowExpression that - # has to be reloaded. - # - def fetch (exp) - #synchronize do + #t = Timer.new - #ldebug { "fetch() exp is of kind #{exp.class}" } + onotify :update, flow_expression.fei, flow_expression - fei = if exp.is_a?(FlowExpression) + #ldebug do + # "update() took #{t.duration} ms " + + # "#{flow_expression.fei.to_debug_s}" + #end - exp.fei + flow_expression + end - elsif not exp.is_a?(FlowExpressionId) + # + # Fetches a FlowExpression from the pool. + # Returns a tuple : the FlowExpression plus its FlowExpressionId. + # + # The param 'exp' may be a FlowExpressionId or a FlowExpression that + # has to be reloaded. + # + def fetch (exp) + #synchronize do - raise \ - "Cannot fetch expression with key : "+ - "'#{fei}' (#{fei.class})" + #ldebug { "fetch() exp is of kind #{exp.class}" } - else + fei = if exp.is_a?(FlowExpression) - exp - end + exp.fei - #ldebug { "fetch() for #{fei.to_debug_s}" } + elsif not exp.is_a?(FlowExpressionId) - [ get_expression_storage[fei], fei ] - #end - end + raise \ + "Cannot fetch expression with key : "+ + "'#{fei}' (#{fei.class})" - # - # Fetches a FlowExpression (returns only the FlowExpression instance) - # - # The param 'exp' may be a FlowExpressionId or a FlowExpression that - # has to be reloaded. - # - def fetch_expression (exp) + else - exp, fei = fetch exp - exp - end + exp + end - # - # Returns the engine environment (the top level environment) - # - def fetch_engine_environment - #synchronize do - # - # synchronize to ensure that there's 1! engine env + #ldebug { "fetch() for #{fei.to_debug_s}" } - eei = engine_environment_id - ee, fei = fetch eei + [ get_expression_storage[fei], fei ] + #end + end - return ee if ee + # + # Fetches a FlowExpression (returns only the FlowExpression instance) + # + # The param 'exp' may be a FlowExpressionId or a FlowExpression that + # has to be reloaded. + # + def fetch_expression (exp) - ee = Environment.new_env( - eei, nil, nil, @application_context, nil) + exp, fei = fetch exp + exp + end - ee.store_itself - - ee - #end - end - + # + # Returns the engine environment (the top level environment) + # + def fetch_engine_environment + #synchronize do # - # Fetches the root expression of a process (or a subprocess). - # - def fetch_root (wfid) + # synchronize to ensure that there's 1! engine env - get_expression_storage.fetch_root wfid - end + eei = engine_environment_id + ee, fei = fetch eei - # - # Removes a flow expression from the pool - # (This method is mainly called from the pool itself) - # - def remove (exp) + return ee if ee - exp, _fei = fetch(exp) \ - if exp.is_a?(FlowExpressionId) + ee = Environment.new_env( + eei, nil, nil, @application_context, nil) - return unless exp + ee.store_itself - ldebug { "remove() fe #{exp.fei.to_debug_s}" } + ee + #end + end - onotify :remove, exp.fei + # + # Fetches the root expression of a process (or a subprocess). + # + def fetch_root (wfid) - #synchronize do - #@monitors.delete(exp.fei) + get_expression_storage.fetch_root wfid + end - remove_environment(exp.environment_id) \ - if exp.owns_its_environment? - #end - end + # + # Removes a flow expression from the pool + # (This method is mainly called from the pool itself) + # + def remove (exp) - # - # This method is called at each expool (engine) [re]start. - # It roams through the previously saved (persisted) expressions - # to reschedule ones like 'sleep' or 'cron'. - # - def reschedule + exp, _fei = fetch(exp) \ + if exp.is_a?(FlowExpressionId) - return if @stopped + return unless exp - #synchronize do + ldebug { "remove() fe #{exp.fei.to_debug_s}" } - t = OpenWFE::Timer.new + onotify :remove, exp.fei - linfo { "reschedule() initiating..." } + #synchronize do + #@monitors.delete(exp.fei) - options = { :include_classes => Rufus::Schedulable } + remove_environment(exp.environment_id) \ + if exp.owns_its_environment? + #end + end - get_expression_storage.find_expressions(options).each do |fexp| + # + # This method is called at each expool (engine) [re]start. + # It roams through the previously saved (persisted) expressions + # to reschedule ones like 'sleep' or 'cron'. + # + def reschedule - linfo { "reschedule() for #{fexp.fei.to_s}..." } + return if @stopped - onotify :reschedule, fexp.fei + t = OpenWFE::Timer.new - fexp.reschedule get_scheduler - end + linfo { "reschedule() initiating..." } - linfo { "reschedule() done. (took #{t.duration} ms)" } - #end - end + options = { :include_classes => Rufus::Schedulable } - # - # Returns the unique engine_environment FlowExpressionId instance. - # There is only one such environment in an engine, hence this - # 'singleton' method. - # - def engine_environment_id - #synchronize do - # no need, it's been already called at initialization + get_expression_storage.find_expressions(options).each do |fexp| - return @eei if @eei + linfo { "reschedule() for #{fexp.fei.to_s}..." } - @eei = FlowExpressionId.new - @eei.owfe_version = OPENWFERU_VERSION - @eei.engine_id = get_engine.engine_name - @eei.initial_engine_id = @eei.engine_id - @eei.workflow_definition_url = 'ee' - @eei.workflow_definition_name = 'ee' - @eei.workflow_definition_revision = '0' - @eei.workflow_instance_id = '0' - @eei.expression_name = EN_ENVIRONMENT - @eei.expression_id = '0' - @eei - #end - end + onotify :reschedule, fexp.fei - # - # Returns the list of applied expressions belonging to a given - # workflow instance. - # - # If the unapplied optional parameter is set to true, all the - # expressions (even those not yet applied) that compose the process - # instance will be returned. Environments will be returned as well. - # - def process_stack (wfid, unapplied=false) + fexp.reschedule get_scheduler + end - #raise "please provide a non-nil workflow instance id" \ - # unless wfid + linfo { "reschedule() done. (took #{t.duration} ms)" } + end - wfid = extract_wfid wfid, true + # + # Returns the unique engine_environment FlowExpressionId instance. + # There is only one such environment in an engine, hence this + # 'singleton' method. + # + def engine_environment_id + #synchronize do + # no need, it's been already called at initialization - params = { - #:exclude_classes => [ Environment, RawExpression ], - #:exclude_classes => [ Environment ], - :parent_wfid => wfid - } - params[:applied] = true if (not unapplied) + return @eei if @eei - stack = get_expression_storage.find_expressions params + @eei = FlowExpressionId.new + @eei.owfe_version = OPENWFERU_VERSION + @eei.engine_id = get_engine.engine_name + @eei.initial_engine_id = @eei.engine_id + @eei.workflow_definition_url = 'ee' + @eei.workflow_definition_name = 'ee' + @eei.workflow_definition_revision = '0' + @eei.workflow_instance_id = '0' + @eei.expression_name = EN_ENVIRONMENT + @eei.expression_id = '0' + @eei + #end + end - stack.extend(RepresentationMixin) if unapplied + # + # Returns the list of applied expressions belonging to a given + # workflow instance. + # + # If the unapplied optional parameter is set to true, all the + # expressions (even those not yet applied) that compose the process + # instance will be returned. Environments will be returned as well. + # + def process_stack (wfid, unapplied=false) - stack - end + #raise "please provide a non-nil workflow instance id" \ + # unless wfid - # - # Lists all workflows (processes) currently in the expool (in - # the engine). - # This method will return a list of "process-definition" expressions - # (root of flows). - # - def list_processes (options={}) + wfid = extract_wfid wfid, true - options[:include_classes] = DefineExpression - # - # Maybe it would be better to list root expressions instead - # so that expressions like 'sequence' can be used - # as root expressions. Later... + params = { + #:exclude_classes => [ Environment, RawExpression ], + #:exclude_classes => [ Environment ], + :parent_wfid => wfid + } + params[:applied] = true if (not unapplied) - get_expression_storage.find_expressions options - end + stack = get_expression_storage.find_expressions params - # - # This method is called when apply() or reply() failed for - # an expression. - # There are currently only two 'users', the ParticipantExpression - # class and the do_process_workelement method of this ExpressionPool - # class. - # - def notify_error (error, fei, message, workitem) + stack.extend(RepresentationMixin) if unapplied - fei = extract_fei fei - # densha requires that... :( + stack + end - se = OpenWFE::exception_to_s error + # + # Lists all workflows (processes) currently in the expool (in + # the engine). + # This method will return a list of "process-definition" expressions + # (root of flows). + # + def list_processes (options={}) - onotify :error, fei, message, workitem, error.class.name, se + options[:include_classes] = DefineExpression + # + # Maybe it would be better to list root expressions instead + # so that expressions like 'sequence' can be used + # as root expressions. Later... - #fei = extract_fei fei + get_expression_storage.find_expressions options + end - if error.is_a?(PausedError) - lwarn do - "#{self.service_name} " + - "operation :#{message.to_s} on #{fei.to_s} " + - "delayed because process '#{fei.wfid}' is in pause" - end - else - lwarn do - "#{self.service_name} " + - "operation :#{message.to_s} on #{fei.to_s} " + - "failed with\n" + se - end - end - end + # + # This method is called when apply() or reply() failed for + # an expression. + # There are currently only two 'users', the ParticipantExpression + # class and the do_process_workelement method of this ExpressionPool + # class. + # + def notify_error (error, fei, message, workitem) - # - # Gets the process definition (if necessary) and turns into - # into an expression tree (for storing into a RawExpression). - # - def determine_rep (param) + fei = extract_fei fei + # densha requires that... :( - param = read_uri(param) if param.is_a?(URI) + se = OpenWFE::exception_to_s error - DefParser.parse param + #onotify :error, fei, message, workitem, error.class.name, se + onotify(:error, fei, message, workitem, error.class.name, error.to_s) + + if error.is_a?(PausedError) + lwarn do + "#{self.service_name} " + + "operation :#{message.to_s} on #{fei.to_s} " + + "delayed because process '#{fei.wfid}' is in pause" end + else + lwarn do + "#{self.service_name} " + + "operation :#{message.to_s} on #{fei.to_s} " + + "failed with\n" + se + end + end + end - # - # Returns true if the process instance to which the expression - # belongs is currently paused. - # - def is_paused? (expression) + # + # Gets the process definition (if necessary) and turns into + # into an expression tree (for storing into a RawExpression). + # + def determine_rep (param) - (@paused_instances[expression.fei.parent_wfid] != nil) - end + param = read_uri(param) if param.is_a?(URI) - protected + #DefParser.parse param + get_def_parser.parse param + end - # - # This is the only point in the expression pool where an URI - # is read, so this is where the :remote_definitions_allowed - # security check is enforced. - # - def read_uri (uri) + # + # Returns true if the process instance to which the expression + # belongs is currently paused. + # + def is_paused? (expression) - uri = URI.parse uri.to_s + (@paused_instances[expression.fei.parent_wfid] != nil) + end - raise ":remote_definitions_allowed is set to false" \ - if (ac[:remote_definitions_allowed] != true and - uri.scheme and - uri.scheme != 'file') + protected - #open(uri.to_s).read + # + # If the launch option :wait_for is set to true, this method + # will be called to apply the raw_expression. It will only return + # when the launched process is over, which means it terminated, it + # had an error or it got cancelled. + # + def wait_for (fei_or_wfid) - f = Rufus::Verbs.fopen uri - result = f.read - f.close if f.respond_to?(:close) + wfid = extract_wfid fei_or_wfid, false - result - end + t = Thread.current + result = nil - # - # This is the method called [asynchronously] by the WorkQueue - # upon apply/reply. - # - def do_apply_reply (direction, exp_or_fei, workitem) + to = get_expression_pool.add_observer(:terminate) do |c, fe, wi| + if fe.fei.workflow_instance_id == wfid + result = [ :terminate, wi, fei_or_wfid ] + t.wakeup + end + end + te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| + if fei.parent_wfid == wfid + result = [ :error, e, fei_or_wfid ] + t.wakeup + end + end + tc = get_expression_pool.add_observer(:cancel) do |c, fe| + if fe.fei.wfid == wfid and fe.fei.expid == '0' + result = [ :cancel, wfid, fei_or_wfid ] + t.wakeup + end + end - fei = nil + #apply raw_expression, wi + yield if block_given? - begin + Thread.stop unless result - exp, fei = if exp_or_fei.is_a?(FlowExpressionId) - fetch exp_or_fei - else - [ exp_or_fei, exp_or_fei.fei ] - end + linfo { "wait_for() '#{wfid}' is over" } - #p [ direction, fei.wfid, fei.expid, fei.expname ] - # - # I uncomment that sometimes to see how the stack - # grows (wfids and expids) + get_expression_pool.remove_observer to, :terminate + get_expression_pool.remove_observer te, :error + get_expression_pool.remove_observer tc, :cancel - ldebug { - ":#{direction} "+ - "target #{fei.to_debug_s}" } + result + end - if not exp + # + # This is the only point in the expression pool where an URI + # is read, so this is where the :remote_definitions_allowed + # security check is enforced. + # + def read_uri (uri) - #raise "apply() cannot apply missing #{_fei.to_debug_s}" - # not very helpful anyway + uri = URI.parse uri.to_s - lwarn { "do_apply_reply() cannot find >#{fei}" } + raise ":remote_definitions_allowed is set to false" \ + if (ac[:remote_definitions_allowed] != true and + uri.scheme and + uri.scheme != 'file') - return - end + #open(uri.to_s).read - check_if_paused exp + f = Rufus::Verbs.fopen uri + result = f.read + f.close if f.respond_to?(:close) - workitem.fei = exp.fei if direction == :apply + result + end - onotify direction, exp, workitem + # + # This is the method called [asynchronously] by the WorkQueue + # upon apply/reply. + # + def do_apply_reply (direction, exp_or_fei, workitem) - exp.send direction, workitem + fei = nil - rescue Exception => e + begin - notify_error e, fei, direction, workitem - end - end + exp, fei = if exp_or_fei.is_a?(FlowExpressionId) + fetch exp_or_fei + else + [ exp_or_fei, exp_or_fei.fei ] + end + #p [ direction, fei.wfid, fei.expid, fei.expname ] # - # Will raise an exception if the expression belongs to a paused - # process. - # - def check_if_paused (expression) + # I uncomment that sometimes to see how the stack + # grows (wfids and expids) - wfid = expression.fei.parent_wfid + ldebug { + ":#{direction} "+ + "target #{fei.to_debug_s}" } - raise PausedError.new(wfid) if @paused_instances[wfid] - end + if not exp - # - # if the launch method is called with a schedule option - # (like :at, :in, :cron and :every), this method takes care of - # wrapping the process with a sleep or a cron. - # - def wrap_in_schedule (raw_expression, options) + #raise "apply() cannot apply missing #{_fei.to_debug_s}" + # not very helpful anyway - oat = options[:at] - oin = options[:in] - ocron = options[:cron] - oevery = options[:every] + lwarn { "do_apply_reply() cannot find >#{fei}" } - fei = new_fei nil, "schedlaunch", "0", "sequence" + return + end - # not very happy with this code, it builds custom - # wrapping processes manually, maybe there is - # a more elegant way, but for now, it's ok. + check_if_paused exp - template = if oat or oin + workitem.fei = exp.fei if direction == :apply - sleep_atts = if oat - { "until" => oat } - else #oin - { "for" => oin } - end - sleep_atts["scheduler-tags"] = "scheduled-launch" + onotify direction, exp, workitem - raw_expression.new_environment - raw_expression.store_itself + exp.send direction, workitem - [ - "sequence", {}, [ - [ "sleep", sleep_atts, [] ], - raw_expression.fei - ] - ] + rescue Exception => e - elsif ocron or oevery + notify_error e, fei, direction, workitem + end + end - fei.expression_name = "cron" + # + # Will raise an exception if the expression belongs to a paused + # process. + # + def check_if_paused (expression) - cron_atts = if ocron - { "tab" => ocron } - else #oevery - { "every" => oevery } - end - cron_atts["name"] = "//cron_launch__#{fei.wfid}" - cron_atts["scheduler-tags"] = "scheduled-launch" + wfid = expression.fei.parent_wfid - template = raw_expression.raw_representation - remove raw_expression + raise PausedError.new(wfid) if @paused_instances[wfid] + end - [ "cron", cron_atts, [ template ] ] + # + # if the launch method is called with a schedule option + # (like :at, :in, :cron and :every), this method takes care of + # wrapping the process with a sleep or a cron. + # + def wrap_in_schedule (raw_expression, options) - else + oat = options[:at] + oin = options[:in] + ocron = options[:cron] + oevery = options[:every] - nil # don't schedule at all - end + fei = new_fei nil, "schedlaunch", "0", "sequence" - if template + # not very happy with this code, it builds custom + # wrapping processes manually, maybe there is + # a more elegant way, but for now, it's ok. - raw_exp = RawExpression.new_raw( - fei, nil, nil, @application_context, template) + template = if oat or oin - raw_exp.store_itself + sleep_atts = if oat + { "until" => oat } + else #oin + { "for" => oin } + end + sleep_atts["scheduler-tags"] = "scheduled-launch, #{fei.wfid}" - raw_exp - else + raw_expression.new_environment + raw_expression.store_itself - raw_expression - end - end + [ + "sequence", {}, [ + [ "sleep", sleep_atts, [] ], + raw_expression.fei + ] + ] - # - # Removes an environment, especially takes care of unbinding - # any special value it may contain. - # - def remove_environment (environment_id) + elsif ocron or oevery - ldebug { "remove_environment() #{environment_id.to_debug_s}" } + fei.expression_name = "cron" - env, fei = fetch(environment_id) + cron_atts = if ocron + { "tab" => ocron } + else #oevery + { "every" => oevery } + end + cron_atts["name"] = "//cron_launch__#{fei.wfid}" + cron_atts["scheduler-tags"] = "scheduled-launch, #{fei.wfid}" - return unless env - # - # env already unbound and removed + template = raw_expression.raw_representation + remove raw_expression - env.unbind + [ "cron", cron_atts, [ template ] ] - #get_expression_storage().delete(environment_id) + else - onotify :remove, environment_id - end + nil # don't schedule at all + end - # - # Prepares a new instance of InFlowWorkItem from a LaunchItem - # instance. - # - def build_workitem (launchitem) + if template - wi = InFlowWorkItem.new + raw_exp = RawExpression.new_raw( + fei, nil, nil, @application_context, template) - wi.attributes = launchitem.attributes.dup + raw_exp.store_itself - wi - end + raw_exp + else - # - # Builds a FlowExpressionId instance for a process being - # launched. - # - def new_fei (launchitem, flow_name, flow_revision, exp_name) + raw_expression + end + end - url = if launchitem - launchitem.workflow_definition_url - else - "no-url" - end + # + # Removes an environment, especially takes care of unbinding + # any special value it may contain. + # + def remove_environment (environment_id) - fei = FlowExpressionId.new + ldebug { "remove_environment() #{environment_id.to_debug_s}" } - fei.owfe_version = OPENWFERU_VERSION - fei.engine_id = OpenWFE::stu get_engine.service_name - fei.initial_engine_id = OpenWFE::stu fei.engine_id - fei.workflow_definition_url = OpenWFE::stu url - fei.workflow_definition_name = OpenWFE::stu flow_name - fei.workflow_definition_revision = OpenWFE::stu flow_revision - fei.wfid = get_wfid_generator.generate launchitem - fei.expression_id = "0" - fei.expression_name = exp_name + env, fei = fetch(environment_id) - fei - end + return unless env + # + # env already unbound and removed - # - # Builds the RawExpression instance at the root of the flow - # being launched. - # - # The param can be a template or a definition (anything - # accepted by the determine_representation() method). - # - def build_raw_expression (launchitem, param) + env.unbind - procdef = determine_rep param + #get_expression_storage().delete(environment_id) - atts = procdef[1] - flow_name = atts['name'] || "noname" - flow_revision = atts['revision'] || "0" - exp_name = procdef.first + onotify :remove, environment_id + end - fei = new_fei launchitem, flow_name, flow_revision, exp_name + # + # Prepares a new instance of InFlowWorkItem from a LaunchItem + # instance. + # + def build_workitem (launchitem) - RawExpression.new_raw( - fei, nil, nil, @application_context, procdef) - end + wi = InFlowWorkItem.new - # - # Given a [replying] child flow expression, will update its parent - # raw expression if the child raw_expression changed. - # - # This is used to keep track of in-flight modification to running - # process instances. - # - def track_child_raw_representation (fexp) + wi.attributes = launchitem.attributes.dup - return unless fexp.raw_rep_updated == true + wi + end - parent = fetch_expression fexp.parent_id + # + # Builds a FlowExpressionId instance for a process being + # launched. + # + def new_fei (launchitem, flow_name, flow_revision, exp_name) - return if parent.class.uses_template? + url = if launchitem + launchitem.workflow_definition_url + else + "no-url" + end - parent.raw_children[fexp.fei.child_id.to_i] = - fexp.raw_representation + fei = FlowExpressionId.new - parent.store_itself - end - end + fei.owfe_version = OPENWFERU_VERSION + fei.engine_id = OpenWFE::stu get_engine.service_name.to_s + fei.initial_engine_id = OpenWFE::stu fei.engine_id + fei.workflow_definition_url = OpenWFE::stu url + fei.workflow_definition_name = OpenWFE::stu flow_name + fei.workflow_definition_revision = OpenWFE::stu flow_revision + fei.wfid = get_wfid_generator.generate launchitem + fei.expression_id = "0" + fei.expression_name = exp_name - # - # This error is raised when an expression belonging to a paused - # process is applied or replied to. - # - class PausedError < RuntimeError + fei + end - attr_reader :wfid + # + # Builds the RawExpression instance at the root of the flow + # being launched. + # + # The param can be a template or a definition (anything + # accepted by the determine_representation() method). + # + def build_raw_expression (launchitem, param) - def initialize (wfid) + procdef = determine_rep param - super "process '#{wfid}' is paused" - @wfid = wfid - end + atts = procdef[1] + flow_name = atts['name'] || "noname" + flow_revision = atts['revision'] || "0" + exp_name = procdef.first - # - # Returns a hash for this PausedError instance. - # (simply returns the hash of the paused process' wfid). - # - def hash + fei = new_fei launchitem, flow_name, flow_revision, exp_name - @wfid.hash - end + RawExpression.new_raw( + fei, nil, nil, @application_context, procdef) + end - # - # Returns true if the other is a PausedError issued for the - # same process instance (wfid). - # - def == (other) + # + # Given a [replying] child flow expression, will update its parent + # raw expression if the child raw_expression changed. + # + # This is used to keep track of in-flight modification to running + # process instances. + # + def track_child_raw_representation (fexp) - return false unless other.is_a?(PausedError) + return unless fexp.raw_rep_updated == true - (@wfid == other.wfid) - end - end + parent = fetch_expression fexp.parent_id - #-- - # a small help class for storing monitors provided on demand - # to expressions that need them - # - #class MonitorProvider - # include MonitorMixin, Logging - # MAX_MONITORS = 10000 - # def initialize (application_context=nil) - # super() - # @application_context = application_context - # @monitors = LruHash.new(MAX_MONITORS) - # end - # def [] (key) - # synchronize do - # (@monitors[key] ||= Monitor.new) - # end - # end - # def delete (key) - # synchronize do - # #ldebug { "delete() removing Monitor for #{key}" } - # @monitors.delete(key) - # end - # end - #end - #++ + return if parent.class.uses_template? + + parent.raw_children[fexp.fei.child_id.to_i] = + fexp.raw_representation + + parent.store_itself + end + end end