lib/openwfe/expool/expressionpool.rb in openwferu-0.9.16 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.17
- old
+ new
@@ -1,8 +1,8 @@
#
#--
-# Copyright (c) 2006-2007, John Mettraux, OpenWFE.org
+# Copyright (c) 2006-2008, John Mettraux, OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
@@ -37,29 +37,25 @@
# John Mettraux at openwfe.org
#
require 'uri'
require 'monitor'
-require 'open-uri'
-require 'rexml/document'
require 'openwfe/utils'
require 'openwfe/service'
require 'openwfe/logging'
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
require 'openwfe/flowexpressionid'
-require 'openwfe/util/lru'
-require 'openwfe/util/safe'
require 'openwfe/util/workqueue'
require 'openwfe/util/observable'
+require 'openwfe/expool/parser'
require 'openwfe/expressions/environment'
-require 'openwfe/expressions/raw_xml'
-require 'openwfe/expressions/raw_prog'
-require 'openwfe/expressions/simplerep'
+require 'openwfe/expressions/raw'
-include OpenWFE
+require 'rufus/lru' # gem 'rufus-lru'
+require 'rufus/verbs' # gem 'rufus-lru'
module OpenWFE
GONE = "gone"
@@ -95,21 +91,26 @@
include FeiMixin
include MonitorMixin
#
- # code loaded from a remote URI will get evaluated with
- # that security level
+ # The hash containing the wfid of the process instances currently
+ # paused.
#
- SAFETY_LEVEL = 2
+ attr_reader :paused_instances
+ #
+ # The constructor for the expression pool.
+ #
def initialize (service_name, application_context)
super()
-
+
service_init(service_name, application_context)
+ @paused_instances = {}
+
@monitors = MonitorProvider.new(application_context)
@observers = {}
@stopped = false
@@ -160,14 +161,16 @@
raise "launchitem.workflow_definition_url not set, cannot launch" \
unless wfdurl
definition = if wfdurl.match "^field:"
+
wfdfield = wfdurl[6..-1]
launchitem.attributes.delete wfdfield
else
- read_uri(wfdurl)
+
+ read_uri wfdurl
end
raise "didn't find process definition at '#{wfdurl}'" \
unless definition
@@ -198,11 +201,11 @@
raw_expression = prepare_raw_expression launchitem
#
# will raise an exception if there are requirements
# and one of them is not met
- raw_expression.new_environment()
+ raw_expression.new_environment
#
# as this expression is the root of a new process instance,
# it has to have an environment for all the variables of
# the process instance
@@ -229,11 +232,11 @@
#
# Used in the concurrent-iterator when building up the children list
# and of course used by the launch_template() method.
#
def prepare_from_template (
- requesting_expression, sub_id, template, params=nil)
+ requesting_expression, env_id, sub_id, template, params=nil)
rawexp = if template.is_a?(RawExpression)
template.application_context = @application_context
template
@@ -280,27 +283,38 @@
#ldebug do
# "launch_template() spawning wfid " +
# "#{rawexp.fei.workflow_instance_id.to_s}"
#end
- env = rawexp.new_environment params
+ if env_id
+
+ rawexp.environment_id = env_id
+ else
#
# the new scope gets its own environment
+ #
+ rawexp.new_environment params
+ end
rawexp.store_itself
rawexp
end
#
# launches a subprocess
#
def launch_template (
- requesting_expression, sub_id, template, workitem, params=nil)
+ requesting_expression,
+ env_id,
+ sub_id,
+ template,
+ workitem,
+ params=nil)
rawexp = prepare_from_template(
- requesting_expression, sub_id, template, params)
+ requesting_expression, env_id, sub_id, template, params)
workitem.flow_expression_id = rawexp.fei
onotify :launch_template, rawexp.fei, workitem
@@ -330,28 +344,30 @@
# Applies a given expression (id or expression)
#
def apply (exp, workitem)
queue_work :do_apply, exp, workitem
+ #do_apply exp, workitem
end
#
# Replies to a given expression
#
def reply (exp, workitem)
queue_work :do_reply, exp, workitem
+ #do_reply exp, workitem
end
#
# Cancels the given expression.
# The param might be an expression instance or a FlowExpressionId
# instance.
#
def cancel (exp)
- exp, fei = fetch(exp)
+ exp, fei = fetch exp
unless exp
ldebug { "cancel() cannot cancel missing #{fei.to_debug_s}" }
return nil
end
@@ -359,11 +375,11 @@
ldebug { "cancel() for #{fei.to_debug_s}" }
onotify :cancel, exp
inflowitem = exp.cancel()
- remove(exp)
+ remove exp
inflowitem
end
#
@@ -379,37 +395,42 @@
exp = fetch_expression exp
wi = cancel exp
if wi
- reply_to_parent(exp, wi, false)
+ reply_to_parent exp, wi, false
else
- parent_exp = fetch_expression(exp.parent_id)
+ parent_exp = fetch_expression exp.parent_id
parent_exp.remove_child(exp.fei) if parent_exp
end
end
#
# Given any expression of a process, cancels the complete process
# instance.
#
def cancel_process (exp_or_wfid)
- ldebug { "cancel_process() from #{exp_or_wfid}" }
+ wfid = extract_wfid exp_or_wfid, false
- root = fetch_root(exp_or_wfid)
- cancel(root)
+ ldebug { "cancel_process() '#{wfid}'" }
+
+ root = fetch_root wfid
+
+ raise "no process to cancel '#{wfid}'" unless root
+
+ cancel root
end
alias :cancel_flow :cancel_process
#
# Forgets the given expression (makes sure to substitute its
# parent_id with the GONE_PARENT_ID constant)
#
def forget (parent_exp, exp)
- exp, fei = fetch(exp)
+ exp, fei = fetch exp
#ldebug { "forget() forgetting #{fei}" }
return if not exp
@@ -516,15 +537,15 @@
def fetch (exp)
synchronize do
#ldebug { "fetch() exp is of kind #{exp.class}" }
- fei = if exp.kind_of?(FlowExpression)
+ fei = if exp.is_a?(FlowExpression)
exp.fei
- elsif not exp.kind_of?(FlowExpressionId)
+ elsif not exp.is_a?(FlowExpressionId)
raise \
"Cannot fetch expression with key : "+
"'#{fei}' (#{fei.class})"
@@ -545,56 +566,45 @@
# The param 'exp' may be a FlowExpressionId or a FlowExpression that
# has to be reloaded.
#
def fetch_expression (exp)
- exp, fei = fetch(exp)
+ exp, fei = fetch exp
exp
end
#
- # Fetches the root expression of a process (given any of its
- # expressions or its wfid).
- #
- def fetch_root (exp_or_wfid)
-
- ldebug { "fetch_root() '#{exp_or_wfid.to_s}'" }
-
- return fetch_expression_with_wfid(exp_or_wfid) \
- if exp_or_wfid.is_a?(String)
-
- exp = fetch_expression(exp_or_wfid)
-
- raise "did not find root for expression #{exp_or_wfid}" unless exp
-
- return exp unless exp.parent_id
-
- fetch_root(fetch_expression(exp.parent_id))
- end
-
- #
# Returns the engine environment (the top level environment)
#
- def fetch_engine_environment ()
+ def fetch_engine_environment
synchronize do
#
# synchronize to ensure that there's 1! engine env
eei = engine_environment_id
- ee, fei = fetch(eei)
+ ee, fei = fetch eei
- if not ee
- ee = Environment\
- .new(eei, nil, nil, @application_context, nil)
- ee.store_itself()
- end
+ return ee if ee
+ ee = Environment.new_env(
+ eei, nil, nil, @application_context, nil)
+
+ ee.store_itself
+
ee
end
end
#
+ # Fetches the root expression of a process (or a subprocess).
+ #
+ def fetch_root (wfid)
+
+ get_expression_storage.fetch_root wfid
+ end
+
+ #
# Removes a flow expression from the pool
# (This method is mainly called from the pool itself)
#
def remove (exp)
@@ -629,18 +639,19 @@
t = OpenWFE::Timer.new
linfo { "reschedule() initiating..." }
- get_expression_storage.each_of_kind(Schedulable) do |fei, fe|
+ options = { :include_classes => Rufus::Schedulable }
- #linfo { "reschedule() for #{fei.to_debug_s}..." }
- linfo { "reschedule() for #{fei.to_s}..." }
+ get_expression_storage.find_expressions(options).each do |fexp|
- onotify :reschedule, fei
+ linfo { "reschedule() for #{fexp.fei.to_s}..." }
- fe.reschedule(get_scheduler)
+ onotify :reschedule, fexp.fei
+
+ fexp.reschedule get_scheduler
end
linfo { "reschedule() done. (took #{t.duration} ms)" }
end
end
@@ -671,91 +682,49 @@
#
# Returns the list of applied expressions belonging to a given
# workflow instance.
#
- def get_process_stack (wfid)
+ # If the unapplied optional parameter is set to true, all the
+ # expressions (even those not yet applied) that compose the process
+ # instance will be returned. Environments will be returned as well.
+ #
+ def process_stack (wfid, unapplied=false)
- raise "please provide a non-nil workflow instance id" \
- unless wfid
+ #raise "please provide a non-nil workflow instance id" \
+ # unless wfid
- wfid = to_wfid wfid
+ wfid = extract_wfid wfid, true
- result = []
+ params = {
+ #:exclude_classes => [ Environment, RawExpression ],
+ #:exclude_classes => [ Environment ],
+ :parent_wfid => wfid
+ }
+ params[:applied] = true if (not unapplied)
- get_expression_storage.real_each do |fei, fexp|
-
- next if fexp.kind_of?(Environment)
- next if fexp.kind_of?(RawExpression)
- next unless fexp.apply_time
-
- next if fei.parent_wfid != wfid
-
- result << fexp
- end
-
- ldebug do
- "process_stack() " +
- "found #{result.size} exps for flow #{wfid}"
- end
-
- result
+ get_expression_storage.find_expressions params
end
- alias :get_flow_stack :get_process_stack
-
#
# Lists all workflows (processes) currently in the expool (in
# the engine).
# This method will return a list of "process-definition" expressions
# (root of flows).
#
- # If consider_subprocesses is set to true, "process-definition"
- # expressions of subprocesses will be returned as well.
- #
- # "wfid_prefix" allows your to query for specific workflow instance
- # id prefixes.
- #
- def list_processes (consider_subprocesses=false, wfid_prefix=nil)
+ def list_processes (options={})
- result = []
+ options[:include_classes] = DefineExpression
+ #
+ # Maybe it would be better to list root expressions instead
+ # so that expressions like 'sequence' can be used
+ # as root expressions. Later...
- # collect() would look better
-
- get_expression_storage.real_each(wfid_prefix) do |fei, fexp|
-
- #ldebug { "list_processes() class is #{fexp.class.name}" }
-
- next unless fexp.is_a?(DefineExpression)
-
- next if not consider_subprocesses and fei.wfid.index(".")
-
- #next unless fei.wfid.match("^#{wfid_prefix}") if wfid_prefix
-
- result << fexp
- end
-
- result
+ get_expression_storage.find_expressions options
end
#
- # Returns the first expression found with the given wfid.
- #
- def fetch_expression_with_wfid (wfid)
-
- #list_processes(false, wfid)[0]
-
- ps = list_processes(false, wfid)
-
- ldebug do
- "fetch_expression_with_wfid() '#{wfid}' found #{ps.size} items"
- end
-
- ps[0]
- end
-
- #
# This method is called when apply() or reply() failed for
# an expression.
# There are currently only two 'users', the ParticipantExpression
# class and the do_process_workelement method of this ExpressionPool
# class.
@@ -763,11 +732,11 @@
def notify_error (error, fei, message, workitem)
fei = extract_fei fei
# densha requires that... :(
- se = OpenWFE::exception_to_s(error)
+ se = OpenWFE::exception_to_s error
onotify :error, fei, message, workitem, error.class.name, se
#fei = extract_fei fei
@@ -784,23 +753,46 @@
"failed with\n" + se
end
end
end
+ #
+ # Gets the process definition (if necessary) and turns into
+ # into an expression tree (for storing into a RawExpression).
+ #
+ def determine_rep (param)
+
+ param = read_uri(param) if param.is_a?(URI)
+
+ DefParser.parse param
+ end
+
protected
- #--
- # Returns true if it's the fei of a participant
- # (or of a subprocess ref)
#
- #def is_participant? (fei)
- # exp_name = fei.expression_name
- # return true if exp_name == "participant"
- # (get_expression_map.get_class(exp_name) == nil)
- #end
- #++
+ # This is the only point in the expression pool where an URI
+ # is read, so this is where the :remote_definitions_allowed
+ # security check is enforced.
+ #
+ def read_uri (uri)
+ uri = URI.parse uri.to_s
+
+ raise "loading remote definitions is not allowed" \
+ if (ac[:remote_definitions_allowed] != true and
+ uri.scheme and
+ uri.scheme != 'file')
+
+ #open(uri.to_s).read
+
+ f = Rufus::Verbs.fopen uri
+ result = f.read
+ f.close if f.respond_to?(:close)
+
+ result
+ end
+
#
# This method is called by the workqueue when processing
# the atomic work operations.
#
def do_process_workelement elt
@@ -811,37 +803,36 @@
send message, fei, workitem
rescue Exception => e
- notify_error(e, fei, message, workitem)
+ notify_error e, fei, message, workitem
end
end
#
# The real apply work.
#
def do_apply (exp, workitem)
exp, _fei = fetch(exp) if exp.is_a?(FlowExpressionId)
- check_if_paused exp
-
#ldebug { "apply() '#{_fei}'" }
if not exp
+ #raise "apply() cannot apply missing #{_fei.to_debug_s}"
+ # not very helpful anyway
+
lwarn do
"do_apply() cannot apply missing #{_fei.to_debug_s}"
end
-
return
-
- #raise "apply() cannot apply missing #{_fei.to_debug_s}"
- # not very helpful anyway
end
+ check_if_paused exp
+
workitem.flow_expression_id = exp.fei
onotify :apply, exp, workitem
exp.apply workitem
@@ -855,18 +846,22 @@
exp, fei = fetch(exp)
ldebug { "reply() to #{fei.to_debug_s}" }
ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" }
- check_if_paused exp
-
if not exp
+
#raise "cannot reply to missing #{fei.to_debug_s}"
- lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" }
+
+ lwarn do
+ "reply() cannot reply to missing #{fei.to_debug_s}"
+ end
return
end
+ check_if_paused exp
+
onotify :reply, exp, workitem
exp.reply(workitem)
end
@@ -874,14 +869,13 @@
# Will raise an exception if the expression belongs to a paused
# process.
#
def check_if_paused (expression)
- return unless expression
+ wfid = expression.fei.parent_wfid
- raise PausedError.new(expression.fei.wfid) \
- if expression.paused?
+ raise PausedError.new(wfid) if @paused_instances[wfid]
end
#
# if the launch method is called with a schedule option
# (like :at, :in, :cron and :every), this method takes care of
@@ -892,32 +886,36 @@
oat = options[:at]
oin = options[:in]
ocron = options[:cron]
oevery = options[:every]
- fei = new_fei(nil, "schedlaunch", "0", "sequence")
+ fei = new_fei nil, "schedlaunch", "0", "sequence"
# not very happy with this code, it builds custom
# wrapping processes manually, maybe there is
# a more elegant way, but for now, it's ok.
if oat or oin
- seq = get_expression_map.get_class(:sequence)
- seq = seq.new(fei, nil, nil, application_context, nil)
+ seq = get_expression_map.get_class :sequence
+ seq = seq.new_exp fei, nil, nil, application_context, nil
att = if oat
{ "until" => oat }
else #oin
{ "for" => oin }
end
att["scheduler-tags"] = "scheduled-launch"
- sle = get_expression_map.get_class(:sleep)
- sle = sle.new(fei.dup, fei, nil, application_context, att)
+ sle = get_expression_map.get_class :sleep
+
+ sle = sle.new_exp(
+ fei.dup, fei, nil, application_context, att)
+
sle.fei.expression_id = "0.0"
sle.fei.expression_name = "sleep"
+
seq.children << sle.fei
seq.children << raw_expression.fei
seq.new_environment
sle.environment_id = seq.environment_id
@@ -938,12 +936,12 @@
{ "every" => oevery }
end
att["name"] = "//cron_launch__#{fei.wfid}"
att["scheduler-tags"] = "scheduled-launch"
- cro = get_expression_map.get_class(:cron)
- cro = cro.new(fei, nil, nil, application_context, att)
+ cro = get_expression_map.get_class :cron
+ cro = cro.new_exp fei, nil, nil, application_context, att
cro.children << raw_expression.fei
cro.new_environment
@@ -990,82 +988,10 @@
wi
end
#
- # This is the only point in the expression pool where an URI
- # is read, so this is where the :remote_definitions_allowed
- # security check is enforced.
- #
- def read_uri (uri)
-
- uri = uri.to_s
- uri = uri[5..-1] if uri.match("^file:")
- uri = URI.parse(uri)
-
- if uri.scheme
- raise "loading remote definitions is not allowed" \
- if ac[:remote_definitions_allowed] != true
- end
-
- open(uri.to_s).read
- end
-
- #
- # The parameter to this method might be either a process
- # definition (in any form) or a LaunchItem.
- #
- # Will return a 'representation' (what is used to build
- # a RawExpression instance).
- #
- def determine_representation (param)
-
- #ldebug do
- # "determine_representation() from class #{param.class.name}"
- #end
-
- param = read_uri(param) if param.is_a?(URI)
-
- #ldebug do
- # "determine_representation() " +
- # "param of class #{param.class.name}"
- #end
-
- return param \
- if param.is_a?(SimpleExpRepresentation)
-
- return param.do_make \
- if param.is_a?(ProcessDefinition) or param.is_a?(Class)
-
- raise "cannot handle definition of class #{param.class.name}" \
- unless param.is_a? String
-
- if param[0, 1] == "<"
- #
- # XML definition
-
- xmlRoot = REXML::Document.new(param).root
- class << xmlRoot
- def raw_expression_class
- XmlRawExpression
- end
- end
- return xmlRoot
- end
-
- return YAML.load(s) if param.match("^--- .")
- #
- # something that was dumped via YAML
-
- #
- # else it's some ruby code to eval
-
- ProcessDefinition::eval_ruby_process_definition(
- param, SAFETY_LEVEL)
- end
-
- #
# Builds a FlowExpressionId instance for a process being
# launched.
#
def new_fei (launchitem, flow_name, flow_revision, exp_name)
@@ -1097,23 +1023,19 @@
# The param can be a template or a definition (anything
# accepted by the determine_representation() method).
#
def build_raw_expression (launchitem, param)
- procdef = determine_representation param
+ procdef = determine_rep param
- #return procdef if procdef.is_a? RawExpression
+ atts = procdef[1]
+ flow_name = atts['name'] || "noname"
+ flow_revision = atts['revision'] || "0"
+ exp_name = procdef.first
- flow_name = procdef.attributes['name']
- flow_revision = procdef.attributes['revision']
- exp_name = procdef.name
+ fei = new_fei launchitem, flow_name, flow_revision, exp_name
- fei = new_fei(launchitem, flow_name, flow_revision, exp_name)
-
- #puts procdef.raw_expression_class
- #puts procdef.raw_expression_class.public_methods
-
- procdef.raw_expression_class.new(
+ RawExpression.new_raw(
fei, nil, nil, @application_context, procdef)
end
end
#