lib/openwfe/expool/expressionpool.rb in openwferu-0.9.16 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.17

- old
+ new

@@ -1,8 +1,8 @@ # #-- -# Copyright (c) 2006-2007, John Mettraux, OpenWFE.org +# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # @@ -37,29 +37,25 @@ # John Mettraux at openwfe.org # require 'uri' require 'monitor' -require 'open-uri' -require 'rexml/document' require 'openwfe/utils' require 'openwfe/service' require 'openwfe/logging' require 'openwfe/omixins' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' -require 'openwfe/util/lru' -require 'openwfe/util/safe' require 'openwfe/util/workqueue' require 'openwfe/util/observable' +require 'openwfe/expool/parser' require 'openwfe/expressions/environment' -require 'openwfe/expressions/raw_xml' -require 'openwfe/expressions/raw_prog' -require 'openwfe/expressions/simplerep' +require 'openwfe/expressions/raw' -include OpenWFE +require 'rufus/lru' # gem 'rufus-lru' +require 'rufus/verbs' # gem 'rufus-lru' module OpenWFE GONE = "gone" @@ -95,21 +91,26 @@ include FeiMixin include MonitorMixin # - # code loaded from a remote URI will get evaluated with - # that security level + # The hash containing the wfid of the process instances currently + # paused. # - SAFETY_LEVEL = 2 + attr_reader :paused_instances + # + # The constructor for the expression pool. + # def initialize (service_name, application_context) super() - + service_init(service_name, application_context) + @paused_instances = {} + @monitors = MonitorProvider.new(application_context) @observers = {} @stopped = false @@ -160,14 +161,16 @@ raise "launchitem.workflow_definition_url not set, cannot launch" \ unless wfdurl definition = if wfdurl.match "^field:" + wfdfield = wfdurl[6..-1] launchitem.attributes.delete wfdfield else - read_uri(wfdurl) + + read_uri wfdurl end raise "didn't find process definition at '#{wfdurl}'" \ unless definition @@ -198,11 +201,11 @@ raw_expression = prepare_raw_expression launchitem # # will raise an exception if there are requirements # and one of them is not met - raw_expression.new_environment() + raw_expression.new_environment # # as this expression is the root of a new process instance, # it has to have an environment for all the variables of # the process instance @@ -229,11 +232,11 @@ # # Used in the concurrent-iterator when building up the children list # and of course used by the launch_template() method. # def prepare_from_template ( - requesting_expression, sub_id, template, params=nil) + requesting_expression, env_id, sub_id, template, params=nil) rawexp = if template.is_a?(RawExpression) template.application_context = @application_context template @@ -280,27 +283,38 @@ #ldebug do # "launch_template() spawning wfid " + # "#{rawexp.fei.workflow_instance_id.to_s}" #end - env = rawexp.new_environment params + if env_id + + rawexp.environment_id = env_id + else # # the new scope gets its own environment + # + rawexp.new_environment params + end rawexp.store_itself rawexp end # # launches a subprocess # def launch_template ( - requesting_expression, sub_id, template, workitem, params=nil) + requesting_expression, + env_id, + sub_id, + template, + workitem, + params=nil) rawexp = prepare_from_template( - requesting_expression, sub_id, template, params) + requesting_expression, env_id, sub_id, template, params) workitem.flow_expression_id = rawexp.fei onotify :launch_template, rawexp.fei, workitem @@ -330,28 +344,30 @@ # Applies a given expression (id or expression) # def apply (exp, workitem) queue_work :do_apply, exp, workitem + #do_apply exp, workitem end # # Replies to a given expression # def reply (exp, workitem) queue_work :do_reply, exp, workitem + #do_reply exp, workitem end # # Cancels the given expression. # The param might be an expression instance or a FlowExpressionId # instance. # def cancel (exp) - exp, fei = fetch(exp) + exp, fei = fetch exp unless exp ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" } return nil end @@ -359,11 +375,11 @@ ldebug { "cancel() for #{fei.to_debug_s}" } onotify :cancel, exp inflowitem = exp.cancel() - remove(exp) + remove exp inflowitem end # @@ -379,37 +395,42 @@ exp = fetch_expression exp wi = cancel exp if wi - reply_to_parent(exp, wi, false) + reply_to_parent exp, wi, false else - parent_exp = fetch_expression(exp.parent_id) + parent_exp = fetch_expression exp.parent_id parent_exp.remove_child(exp.fei) if parent_exp end end # # Given any expression of a process, cancels the complete process # instance. # def cancel_process (exp_or_wfid) - ldebug { "cancel_process() from #{exp_or_wfid}" } + wfid = extract_wfid exp_or_wfid, false - root = fetch_root(exp_or_wfid) - cancel(root) + ldebug { "cancel_process() '#{wfid}'" } + + root = fetch_root wfid + + raise "no process to cancel '#{wfid}'" unless root + + cancel root end alias :cancel_flow :cancel_process # # Forgets the given expression (makes sure to substitute its # parent_id with the GONE_PARENT_ID constant) # def forget (parent_exp, exp) - exp, fei = fetch(exp) + exp, fei = fetch exp #ldebug { "forget() forgetting #{fei}" } return if not exp @@ -516,15 +537,15 @@ def fetch (exp) synchronize do #ldebug { "fetch() exp is of kind #{exp.class}" } - fei = if exp.kind_of?(FlowExpression) + fei = if exp.is_a?(FlowExpression) exp.fei - elsif not exp.kind_of?(FlowExpressionId) + elsif not exp.is_a?(FlowExpressionId) raise \ "Cannot fetch expression with key : "+ "'#{fei}' (#{fei.class})" @@ -545,56 +566,45 @@ # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch_expression (exp) - exp, fei = fetch(exp) + exp, fei = fetch exp exp end # - # Fetches the root expression of a process (given any of its - # expressions or its wfid). - # - def fetch_root (exp_or_wfid) - - ldebug { "fetch_root() '#{exp_or_wfid.to_s}'" } - - return fetch_expression_with_wfid(exp_or_wfid) \ - if exp_or_wfid.is_a?(String) - - exp = fetch_expression(exp_or_wfid) - - raise "did not find root for expression #{exp_or_wfid}" unless exp - - return exp unless exp.parent_id - - fetch_root(fetch_expression(exp.parent_id)) - end - - # # Returns the engine environment (the top level environment) # - def fetch_engine_environment () + def fetch_engine_environment synchronize do # # synchronize to ensure that there's 1! engine env eei = engine_environment_id - ee, fei = fetch(eei) + ee, fei = fetch eei - if not ee - ee = Environment\ - .new(eei, nil, nil, @application_context, nil) - ee.store_itself() - end + return ee if ee + ee = Environment.new_env( + eei, nil, nil, @application_context, nil) + + ee.store_itself + ee end end # + # Fetches the root expression of a process (or a subprocess). + # + def fetch_root (wfid) + + get_expression_storage.fetch_root wfid + end + + # # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # def remove (exp) @@ -629,18 +639,19 @@ t = OpenWFE::Timer.new linfo { "reschedule() initiating..." } - get_expression_storage.each_of_kind(Schedulable) do |fei, fe| + options = { :include_classes => Rufus::Schedulable } - #linfo { "reschedule() for #{fei.to_debug_s}..." } - linfo { "reschedule() for #{fei.to_s}..." } + get_expression_storage.find_expressions(options).each do |fexp| - onotify :reschedule, fei + linfo { "reschedule() for #{fexp.fei.to_s}..." } - fe.reschedule(get_scheduler) + onotify :reschedule, fexp.fei + + fexp.reschedule get_scheduler end linfo { "reschedule() done. (took #{t.duration} ms)" } end end @@ -671,91 +682,49 @@ # # Returns the list of applied expressions belonging to a given # workflow instance. # - def get_process_stack (wfid) + # If the unapplied optional parameter is set to true, all the + # expressions (even those not yet applied) that compose the process + # instance will be returned. Environments will be returned as well. + # + def process_stack (wfid, unapplied=false) - raise "please provide a non-nil workflow instance id" \ - unless wfid + #raise "please provide a non-nil workflow instance id" \ + # unless wfid - wfid = to_wfid wfid + wfid = extract_wfid wfid, true - result = [] + params = { + #:exclude_classes => [ Environment, RawExpression ], + #:exclude_classes => [ Environment ], + :parent_wfid => wfid + } + params[:applied] = true if (not unapplied) - get_expression_storage.real_each do |fei, fexp| - - next if fexp.kind_of?(Environment) - next if fexp.kind_of?(RawExpression) - next unless fexp.apply_time - - next if fei.parent_wfid != wfid - - result << fexp - end - - ldebug do - "process_stack() " + - "found #{result.size} exps for flow #{wfid}" - end - - result + get_expression_storage.find_expressions params end - alias :get_flow_stack :get_process_stack - # # Lists all workflows (processes) currently in the expool (in # the engine). # This method will return a list of "process-definition" expressions # (root of flows). # - # If consider_subprocesses is set to true, "process-definition" - # expressions of subprocesses will be returned as well. - # - # "wfid_prefix" allows your to query for specific workflow instance - # id prefixes. - # - def list_processes (consider_subprocesses=false, wfid_prefix=nil) + def list_processes (options={}) - result = [] + options[:include_classes] = DefineExpression + # + # Maybe it would be better to list root expressions instead + # so that expressions like 'sequence' can be used + # as root expressions. Later... - # collect() would look better - - get_expression_storage.real_each(wfid_prefix) do |fei, fexp| - - #ldebug { "list_processes() class is #{fexp.class.name}" } - - next unless fexp.is_a?(DefineExpression) - - next if not consider_subprocesses and fei.wfid.index(".") - - #next unless fei.wfid.match("^#{wfid_prefix}") if wfid_prefix - - result << fexp - end - - result + get_expression_storage.find_expressions options end # - # Returns the first expression found with the given wfid. - # - def fetch_expression_with_wfid (wfid) - - #list_processes(false, wfid)[0] - - ps = list_processes(false, wfid) - - ldebug do - "fetch_expression_with_wfid() '#{wfid}' found #{ps.size} items" - end - - ps[0] - end - - # # This method is called when apply() or reply() failed for # an expression. # There are currently only two 'users', the ParticipantExpression # class and the do_process_workelement method of this ExpressionPool # class. @@ -763,11 +732,11 @@ def notify_error (error, fei, message, workitem) fei = extract_fei fei # densha requires that... :( - se = OpenWFE::exception_to_s(error) + se = OpenWFE::exception_to_s error onotify :error, fei, message, workitem, error.class.name, se #fei = extract_fei fei @@ -784,23 +753,46 @@ "failed with\n" + se end end end + # + # Gets the process definition (if necessary) and turns into + # into an expression tree (for storing into a RawExpression). + # + def determine_rep (param) + + param = read_uri(param) if param.is_a?(URI) + + DefParser.parse param + end + protected - #-- - # Returns true if it's the fei of a participant - # (or of a subprocess ref) # - #def is_participant? (fei) - # exp_name = fei.expression_name - # return true if exp_name == "participant" - # (get_expression_map.get_class(exp_name) == nil) - #end - #++ + # This is the only point in the expression pool where an URI + # is read, so this is where the :remote_definitions_allowed + # security check is enforced. + # + def read_uri (uri) + uri = URI.parse uri.to_s + + raise "loading remote definitions is not allowed" \ + if (ac[:remote_definitions_allowed] != true and + uri.scheme and + uri.scheme != 'file') + + #open(uri.to_s).read + + f = Rufus::Verbs.fopen uri + result = f.read + f.close if f.respond_to?(:close) + + result + end + # # This method is called by the workqueue when processing # the atomic work operations. # def do_process_workelement elt @@ -811,37 +803,36 @@ send message, fei, workitem rescue Exception => e - notify_error(e, fei, message, workitem) + notify_error e, fei, message, workitem end end # # The real apply work. # def do_apply (exp, workitem) exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId) - check_if_paused exp - #ldebug { "apply() '#{_fei}'" } if not exp + #raise "apply() cannot apply missing #{_fei.to_debug_s}" + # not very helpful anyway + lwarn do "do_apply() cannot apply missing #{_fei.to_debug_s}" end - return - - #raise "apply() cannot apply missing #{_fei.to_debug_s}" - # not very helpful anyway end + check_if_paused exp + workitem.flow_expression_id = exp.fei onotify :apply, exp, workitem exp.apply workitem @@ -855,18 +846,22 @@ exp, fei = fetch(exp) ldebug { "reply() to #{fei.to_debug_s}" } ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" } - check_if_paused exp - if not exp + #raise "cannot reply to missing #{fei.to_debug_s}" - lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } + + lwarn do + "reply() cannot reply to missing #{fei.to_debug_s}" + end return end + check_if_paused exp + onotify :reply, exp, workitem exp.reply(workitem) end @@ -874,14 +869,13 @@ # Will raise an exception if the expression belongs to a paused # process. # def check_if_paused (expression) - return unless expression + wfid = expression.fei.parent_wfid - raise PausedError.new(expression.fei.wfid) \ - if expression.paused? + raise PausedError.new(wfid) if @paused_instances[wfid] end # # if the launch method is called with a schedule option # (like :at, :in, :cron and :every), this method takes care of @@ -892,32 +886,36 @@ oat = options[:at] oin = options[:in] ocron = options[:cron] oevery = options[:every] - fei = new_fei(nil, "schedlaunch", "0", "sequence") + fei = new_fei nil, "schedlaunch", "0", "sequence" # not very happy with this code, it builds custom # wrapping processes manually, maybe there is # a more elegant way, but for now, it's ok. if oat or oin - seq = get_expression_map.get_class(:sequence) - seq = seq.new(fei, nil, nil, application_context, nil) + seq = get_expression_map.get_class :sequence + seq = seq.new_exp fei, nil, nil, application_context, nil att = if oat { "until" => oat } else #oin { "for" => oin } end att["scheduler-tags"] = "scheduled-launch" - sle = get_expression_map.get_class(:sleep) - sle = sle.new(fei.dup, fei, nil, application_context, att) + sle = get_expression_map.get_class :sleep + + sle = sle.new_exp( + fei.dup, fei, nil, application_context, att) + sle.fei.expression_id = "0.0" sle.fei.expression_name = "sleep" + seq.children << sle.fei seq.children << raw_expression.fei seq.new_environment sle.environment_id = seq.environment_id @@ -938,12 +936,12 @@ { "every" => oevery } end att["name"] = "//cron_launch__#{fei.wfid}" att["scheduler-tags"] = "scheduled-launch" - cro = get_expression_map.get_class(:cron) - cro = cro.new(fei, nil, nil, application_context, att) + cro = get_expression_map.get_class :cron + cro = cro.new_exp fei, nil, nil, application_context, att cro.children << raw_expression.fei cro.new_environment @@ -990,82 +988,10 @@ wi end # - # This is the only point in the expression pool where an URI - # is read, so this is where the :remote_definitions_allowed - # security check is enforced. - # - def read_uri (uri) - - uri = uri.to_s - uri = uri[5..-1] if uri.match("^file:") - uri = URI.parse(uri) - - if uri.scheme - raise "loading remote definitions is not allowed" \ - if ac[:remote_definitions_allowed] != true - end - - open(uri.to_s).read - end - - # - # The parameter to this method might be either a process - # definition (in any form) or a LaunchItem. - # - # Will return a 'representation' (what is used to build - # a RawExpression instance). - # - def determine_representation (param) - - #ldebug do - # "determine_representation() from class #{param.class.name}" - #end - - param = read_uri(param) if param.is_a?(URI) - - #ldebug do - # "determine_representation() " + - # "param of class #{param.class.name}" - #end - - return param \ - if param.is_a?(SimpleExpRepresentation) - - return param.do_make \ - if param.is_a?(ProcessDefinition) or param.is_a?(Class) - - raise "cannot handle definition of class #{param.class.name}" \ - unless param.is_a? String - - if param[0, 1] == "<" - # - # XML definition - - xmlRoot = REXML::Document.new(param).root - class << xmlRoot - def raw_expression_class - XmlRawExpression - end - end - return xmlRoot - end - - return YAML.load(s) if param.match("^--- .") - # - # something that was dumped via YAML - - # - # else it's some ruby code to eval - - ProcessDefinition::eval_ruby_process_definition( - param, SAFETY_LEVEL) - end - - # # Builds a FlowExpressionId instance for a process being # launched. # def new_fei (launchitem, flow_name, flow_revision, exp_name) @@ -1097,23 +1023,19 @@ # The param can be a template or a definition (anything # accepted by the determine_representation() method). # def build_raw_expression (launchitem, param) - procdef = determine_representation param + procdef = determine_rep param - #return procdef if procdef.is_a? RawExpression + atts = procdef[1] + flow_name = atts['name'] || "noname" + flow_revision = atts['revision'] || "0" + exp_name = procdef.first - flow_name = procdef.attributes['name'] - flow_revision = procdef.attributes['revision'] - exp_name = procdef.name + fei = new_fei launchitem, flow_name, flow_revision, exp_name - fei = new_fei(launchitem, flow_name, flow_revision, exp_name) - - #puts procdef.raw_expression_class - #puts procdef.raw_expression_class.public_methods - - procdef.raw_expression_class.new( + RawExpression.new_raw( fei, nil, nil, @application_context, procdef) end end #