lib/openwfe/expool/expressionpool.rb in openwferu-0.9.15 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.16

- old
+ new

@@ -188,11 +188,11 @@ # before the actual launch is completely over. # # Returns the FlowExpressionId instance of the root expression of # the newly launched flow. # - def launch (launchitem, options) + def launch (launchitem, options={}) # # prepare raw expression raw_expression = prepare_raw_expression launchitem @@ -205,11 +205,11 @@ # as this expression is the root of a new process instance, # it has to have an environment for all the variables of # the process instance raw_expression = wrap_in_schedule(raw_expression, options) \ - if options and options.size > 0 + if options.size > 0 fei = raw_expression.fei # # apply prepared raw expression @@ -232,22 +232,28 @@ # def prepare_from_template ( requesting_expression, sub_id, template, params=nil) rawexp = if template.is_a?(RawExpression) + + template.application_context = @application_context template + elsif template.is_a?(FlowExpressionId) - fetch_expression(template) + + fetch_expression template + else - build_raw_expression(nil, template) + + build_raw_expression nil, template end #raise "did not find subprocess in : #{template.to_s}" \ # unless rawexp - rawexp = rawexp.dup() - rawexp.fei = rawexp.fei.dup() + rawexp = rawexp.dup + rawexp.fei = rawexp.fei.dup if requesting_expression == nil rawexp.parent_id = nil rawexp.fei.workflow_instance_id = get_wfid_generator.generate @@ -274,15 +280,15 @@ #ldebug do # "launch_template() spawning wfid " + # "#{rawexp.fei.workflow_instance_id.to_s}" #end - env = rawexp.new_environment(params) + env = rawexp.new_environment params # # the new scope gets its own environment - rawexp.store_itself() + rawexp.store_itself rawexp end # @@ -301,27 +307,27 @@ apply rawexp, workitem rawexp.fei end - # + #-- # Evaluates a raw definition expression and # returns its body fei # - def evaluate (rawExpression, workitem) + #def evaluate (rawExpression, workitem) + # #rawExpression = fetch_expression(rawExpression) \ + # # if rawExpression.is_a?(FlowExpressionId) + # exp = rawExpression.instantiate_real_expression workitem + # fei = exp.evaluate workitem + # #remove(rawExpression) + # # + # # not necessary, the raw expression gets overriden by + # # the real expression + # fei + #end + #++ - exp = rawExpression.instantiate_real_expression workitem - fei = exp.evaluate workitem - - #remove(rawExpression) - # - # not necessary, the raw expression gets overriden by - # the real expression - - fei - end - # # Applies a given expression (id or expression) # def apply (exp, workitem) @@ -368,13 +374,13 @@ # care of removing the cancelled expression from the parent # expression. # def cancel_expression (exp) - exp = fetch_expression(exp) + exp = fetch_expression exp - wi = cancel(exp) + wi = cancel exp if wi reply_to_parent(exp, wi, false) else parent_exp = fetch_expression(exp.parent_id) @@ -417,62 +423,10 @@ ldebug { "forget() forgot #{fei}" } end # - # Pauses a process (sets its '/__paused__' variable to true). - # - def pause_process (wfid) - - wfid = extract_wfid(wfid) - - root_expression = fetch_root(wfid) - - root_expression.set_variable(VAR_PAUSED, true) - end - - # - # Restarts a process : removes its 'paused' flag (variable) and makes - # sure to 'replay' events (replies) that came for it while it was - # in pause. - # - def resume_process (wfid) - - wfid = extract_wfid(wfid) - - root_expression = fetch_root(wfid) - - # - # remove 'paused' flag - - root_expression.unset_variable(VAR_PAUSED) - - # - # replay - - journal = get_error_journal - - # select PausedError instances in separate list - - errors = journal.get_error_log(wfid) - error_class = PausedError.name - paused_errors = errors.select { |e| e.error_class == error_class } - - return if paused_errors.size < 1 - - # remove them from current error journal - - journal.remove_errors wfid, paused_errors - - # replay select PausedError instances - - paused_errors.each do |e| - journal.replay_at_error e - end - end - - # # Replies to the parent of the given expression. # def reply_to_parent (exp, workitem, remove=true) ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } @@ -536,20 +490,20 @@ # # Adds or updates a flow expression in this pool # def update (flow_expression) - #ldebug { "update() for #{flow_expression.fei.to_debug_s}" } + ldebug { "update() for #{flow_expression.fei.to_debug_s}" } - t = Timer.new + #t = Timer.new onotify :update, flow_expression.fei, flow_expression - ldebug do - "update() took #{t.duration} ms " + - "#{flow_expression.fei.to_debug_s}" - end + #ldebug do + # "update() took #{t.duration} ms " + + # "#{flow_expression.fei.to_debug_s}" + #end flow_expression end # @@ -560,20 +514,25 @@ # has to be reloaded. # def fetch (exp) synchronize do - fei = exp - #ldebug { "fetch() exp is of kind #{exp.class}" } - if exp.kind_of?(FlowExpression) - fei = exp.fei + fei = if exp.kind_of?(FlowExpression) + + exp.fei + elsif not exp.kind_of?(FlowExpressionId) + raise \ "Cannot fetch expression with key : "+ "'#{fei}' (#{fei.class})" + + else + + exp end #ldebug { "fetch() for #{fei.to_debug_s}" } [ get_expression_storage()[fei], fei ] @@ -586,20 +545,22 @@ # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch_expression (exp) - exp, _fei = fetch(exp) + exp, fei = fetch(exp) exp end # # Fetches the root expression of a process (given any of its # expressions or its wfid). # def fetch_root (exp_or_wfid) + ldebug { "fetch_root() '#{exp_or_wfid.to_s}'" } + return fetch_expression_with_wfid(exp_or_wfid) \ if exp_or_wfid.is_a?(String) exp = fetch_expression(exp_or_wfid) @@ -636,11 +597,11 @@ # (This method is mainly called from the pool itself) # def remove (exp) exp, _fei = fetch(exp) \ - if exp.kind_of?(FlowExpressionId) + if exp.is_a?(FlowExpressionId) return unless exp ldebug { "remove() fe #{exp.fei.to_debug_s}" } @@ -668,16 +629,16 @@ t = OpenWFE::Timer.new linfo { "reschedule() initiating..." } - get_expression_storage.each_of_kind(Schedulable) do |fe| + get_expression_storage.each_of_kind(Schedulable) do |fei, fe| - #linfo { "reschedule() for #{fe.fei.to_debug_s}..." } - linfo { "reschedule() for #{fe.fei.to_s}..." } + #linfo { "reschedule() for #{fei.to_debug_s}..." } + linfo { "reschedule() for #{fei.to_s}..." } - onotify :reschedule, fe.fei + onotify :reschedule, fei fe.reschedule(get_scheduler) end linfo { "reschedule() done. (took #{t.duration} ms)" } @@ -760,12 +721,14 @@ # collect() would look better get_expression_storage.real_each(wfid_prefix) do |fei, fexp| - next unless fexp.is_a? DefineExpression + #ldebug { "list_processes() class is #{fexp.class.name}" } + next unless fexp.is_a?(DefineExpression) + next if not consider_subprocesses and fei.wfid.index(".") #next unless fei.wfid.match("^#{wfid_prefix}") if wfid_prefix result << fexp @@ -777,11 +740,19 @@ # # Returns the first expression found with the given wfid. # def fetch_expression_with_wfid (wfid) - list_processes(false, wfid)[0] + #list_processes(false, wfid)[0] + + ps = list_processes(false, wfid) + + ldebug do + "fetch_expression_with_wfid() '#{wfid}' found #{ps.size} items" + end + + ps[0] end # # This method is called when apply() or reply() failed for # an expression. @@ -849,35 +820,33 @@ # # The real apply work. # def do_apply (exp, workitem) - exp, fei = fetch(exp) if exp.kind_of?(FlowExpressionId) + exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId) check_if_paused exp - #ldebug { "apply() '#{fei}' (#{fei.class})" } + #ldebug { "apply() '#{_fei}'" } if not exp - lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } + lwarn do + "do_apply() cannot apply missing #{_fei.to_debug_s}" + end + return - #raise "apply() cannot apply missing #{fei.to_debug_s}" + #raise "apply() cannot apply missing #{_fei.to_debug_s}" + # not very helpful anyway end - #ldebug { "apply() #{fei.to_debug_s}" } - - #exp.apply_time = OpenWFE::now() - # - # this is done in RawExpression - workitem.flow_expression_id = exp.fei onotify :apply, exp, workitem - exp.apply(workitem) + exp.apply workitem end # # The real reply work is done here # @@ -939,14 +908,15 @@ att = if oat { "until" => oat } else #oin { "for" => oin } end + att["scheduler-tags"] = "scheduled-launch" sle = get_expression_map.get_class(:sleep) sle = sle.new(fei.dup, fei, nil, application_context, att) - sle.fei.expression_id = "0.1" + sle.fei.expression_id = "0.0" sle.fei.expression_name = "sleep" seq.children << sle.fei seq.children << raw_expression.fei seq.new_environment @@ -966,10 +936,11 @@ { "tab" => ocron } else #oevery { "every" => oevery } end att["name"] = "//cron_launch__#{fei.wfid}" + att["scheduler-tags"] = "scheduled-launch" cro = get_expression_map.get_class(:cron) cro = cro.new(fei, nil, nil, application_context, att) cro.children << raw_expression.fei @@ -1126,10 +1097,10 @@ # The param can be a template or a definition (anything # accepted by the determine_representation() method). # def build_raw_expression (launchitem, param) - procdef = determine_representation(param) + procdef = determine_representation param #return procdef if procdef.is_a? RawExpression flow_name = procdef.attributes['name'] flow_revision = procdef.attributes['revision']