lib/openwfe/expool/expressionpool.rb in openwferu-0.9.13 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.14
- old
+ new
@@ -88,15 +88,16 @@
# expressions.
#
class ExpressionPool
include ServiceMixin
include OwfeServiceLocator
- include Observable
+ include OwfeObservable
include WorkqueueMixin
include FeiMixin
include MonitorMixin
+
#
# code loaded from a remote URI will get evaluated with
# that security level
#
SAFETY_LEVEL = 2
@@ -111,10 +112,13 @@
@observers = {}
@stopped = false
+ engine_environment_id
+ # makes sure it's called now
+
start_workqueue
end
#
# Stops this expression pool (especially its workqueue).
@@ -135,11 +139,11 @@
# It avoids the need for the FlowExpression instances to include
# the monitor mixin by themselves
#
def get_monitor (fei)
- return @monitors[fei]
+ @monitors[fei]
end
#
# This method is called by the launch method. It's actually the first
# stage of that method.
@@ -184,12 +188,15 @@
# before the actual launch is completely over.
#
# Returns the FlowExpressionId instance of the root expression of
# the newly launched flow.
#
- def launch (launchitem)
+ def launch (launchitem, options)
+ #
+ # prepare raw expression
+
raw_expression = prepare_raw_expression launchitem
#
# will raise an exception if there are requirements
# and one of them is not met
@@ -197,14 +204,20 @@
#
# as this expression is the root of a new process instance,
# it has to have an environment for all the variables of
# the process instance
- wi = build_workitem launchitem
+ raw_expression = wrap_in_schedule(raw_expression, options) \
+ if options and options.size > 0
fei = raw_expression.fei
+ #
+ # apply prepared raw expression
+
+ wi = build_workitem launchitem
+
onotify :launch, fei, launchitem
apply raw_expression, wi
fei
@@ -218,36 +231,36 @@
# and of course used by the launch_template() method.
#
def prepare_from_template (
requesting_expression, sub_id, template, params=nil)
- rawexp = if template.is_a? RawExpression
+ rawexp = if template.is_a?(RawExpression)
template
- elsif template.is_a? FlowExpressionId
+ elsif template.is_a?(FlowExpressionId)
fetch_expression(template)
else
build_raw_expression(nil, template)
end
- #raise "did not find expression at #{template.to_s}" \
+ #raise "did not find subprocess in : #{template.to_s}" \
# unless rawexp
rawexp = rawexp.dup()
rawexp.fei = rawexp.fei.dup()
if requesting_expression == nil
rawexp.parent_id = nil
rawexp.fei.workflow_instance_id = get_wfid_generator.generate
- elsif requesting_expression.kind_of? FlowExpressionId
+ elsif requesting_expression.kind_of?(FlowExpressionId)
rawexp.parent_id = requesting_expression
rawexp.fei.workflow_instance_id = \
"#{requesting_expression.workflow_instance_id}.#{sub_id}"
- elsif requesting_expression.kind_of? String
+ elsif requesting_expression.kind_of?(String)
rawexp.parent_id = nil
rawexp.fei.workflow_instance_id = \
"#{requesting_expression}.#{sub_id}"
@@ -310,20 +323,18 @@
#
# Applies a given expression (id or expression)
#
def apply (exp, workitem)
- #do_apply exp, workitem
queue_work :do_apply, exp, workitem
end
#
# Replies to a given expression
#
def reply (exp, workitem)
- #do_reply exp, workitem
queue_work :do_reply, exp, workitem
end
#
# Cancels the given expression.
@@ -406,10 +417,62 @@
ldebug { "forget() forgot #{fei}" }
end
#
+ # Pauses a process (sets its '/__paused__' variable to true).
+ #
+ def pause_process (wfid)
+
+ wfid = extract_wfid(wfid)
+
+ root_expression = fetch_root(wfid)
+
+ root_expression.set_variable(VAR_PAUSED, true)
+ end
+
+ #
+ # Restarts a process : removes its 'paused' flag (variable) and makes
+ # sure to 'replay' events (replies) that came for it while it was
+ # in pause.
+ #
+ def resume_process (wfid)
+
+ wfid = extract_wfid(wfid)
+
+ root_expression = fetch_root(wfid)
+
+ #
+ # remove 'paused' flag
+
+ root_expression.unset_variable(VAR_PAUSED)
+
+ #
+ # replay
+
+ journal = get_error_journal
+
+ # select PausedError instances in separate list
+
+ errors = journal.get_error_log(wfid)
+ error_class = PausedError.name
+ paused_errors = errors.select { |e| e.error_class == error_class }
+
+ return if paused_errors.size < 1
+
+ # remove them from current error journal
+
+ journal.remove_errors wfid, paused_errors
+
+ # replay select PausedError instances
+
+ paused_errors.each do |e|
+ journal.replay_at_error e
+ end
+ end
+
+ #
# Replies to the parent of the given expression.
#
def reply_to_parent (exp, workitem, remove=true)
ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" }
@@ -550,10 +613,12 @@
#
# Returns the engine environment (the top level environment)
#
def fetch_engine_environment ()
synchronize do
+ #
+ # synchronize to ensure that there's 1! engine env
eei = engine_environment_id
ee, fei = fetch(eei)
if not ee
@@ -573,11 +638,11 @@
def remove (exp)
exp, _fei = fetch(exp) \
if exp.kind_of?(FlowExpressionId)
- return if not exp
+ return unless exp
ldebug { "remove() fe #{exp.fei.to_debug_s}" }
onotify :remove, exp.fei
@@ -601,47 +666,48 @@
synchronize do
t = OpenWFE::Timer.new
- ldebug { "reschedule() initiating..." }
+ linfo { "reschedule() initiating..." }
get_expression_storage.each_of_kind(Schedulable) do |fe|
- ldebug { "reschedule() for #{fe.fei.to_debug_s}..." }
+ #linfo { "reschedule() for #{fe.fei.to_debug_s}..." }
+ linfo { "reschedule() for #{fe.fei.to_s}..." }
onotify :reschedule, fe.fei
fe.reschedule(get_scheduler)
end
- ldebug { "reschedule() done. (took #{t.duration} ms)" }
+ linfo { "reschedule() done. (took #{t.duration} ms)" }
end
end
#
# Returns the unique engine_environment FlowExpressionId instance.
# There is only one such environment in an engine, hence this
# 'singleton' method.
#
def engine_environment_id ()
- synchronize do
+ #synchronize do
- return @eei if @eei
+ return @eei if @eei
- @eei = FlowExpressionId.new
- @eei.owfe_version = OPENWFERU_VERSION
- @eei.engine_id = get_engine.service_name
- @eei.initial_engine_id = @eei.engine_id
- @eei.workflow_definition_url = 'ee'
- @eei.workflow_definition_name = 'ee'
- @eei.workflow_definition_revision = '0'
- @eei.workflow_instance_id = '0'
- @eei.expression_name = EN_ENVIRONMENT
- @eei.expression_id = '0'
- @eei
- end
+ @eei = FlowExpressionId.new
+ @eei.owfe_version = OPENWFERU_VERSION
+ @eei.engine_id = get_engine.service_name
+ @eei.initial_engine_id = @eei.engine_id
+ @eei.workflow_definition_url = 'ee'
+ @eei.workflow_definition_name = 'ee'
+ @eei.workflow_definition_revision = '0'
+ @eei.workflow_instance_id = '0'
+ @eei.expression_name = EN_ENVIRONMENT
+ @eei.expression_id = '0'
+ @eei
+ #end
end
#
# Returns the list of applied expressions belonging to a given
# workflow instance.
@@ -714,51 +780,91 @@
def fetch_expression_with_wfid (wfid)
list_processes(false, wfid)[0]
end
+ #
+ # This method is called when apply() or reply() failed for
+ # an expression.
+ # There are currently only two 'users', the ParticipantExpression
+ # class and the do_process_workelement method of this ExpressionPool
+ # class.
+ #
+ def notify_error (error, fei, message, workitem)
+
+ fei = extract_fei fei
+ # densha requires that... :(
+
+ se = OpenWFE::exception_to_s(error)
+
+ onotify :error, fei, message, workitem, error.class.name, se
+
+ #fei = extract_fei fei
+
+ if error.is_a?(PausedError)
+ lwarn do
+ "#{self.service_name} " +
+ "operation :#{message.to_s} on #{fei.to_s} " +
+ "delayed because process '#{fei.wfid}' is in pause"
+ end
+ else
+ lwarn do
+ "#{self.service_name} " +
+ "operation :#{message.to_s} on #{fei.to_s} " +
+ "failed with\n" + se
+ end
+ end
+ end
+
protected
+ #--
+ # Returns true if it's the fei of a participant
+ # (or of a subprocess ref)
#
+ #def is_participant? (fei)
+ # exp_name = fei.expression_name
+ # return true if exp_name == "participant"
+ # (get_expression_map.get_class(exp_name) == nil)
+ #end
+ #++
+
+ #
# This method is called by the workqueue when processing
# the atomic work operations.
#
def do_process_workelement elt
+ message, fei, workitem = elt
+
begin
- message, fei, workitem = elt
send message, fei, workitem
rescue Exception => e
- se = OpenWFE::exception_to_s(e)
-
- onotify :error, fei, message, workitem, se
-
- fei = extract_fei fei
-
- lwarn do
- "#{self.service_name} " +
- "operation :#{message.to_s} on #{fei.to_s} " +
- "failed with\n" + se
- end
+ notify_error(e, fei, message, workitem)
end
end
#
# The real apply work.
#
def do_apply (exp, workitem)
- exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId
+ exp, fei = fetch(exp) if exp.kind_of?(FlowExpressionId)
+ check_if_paused exp
+
#ldebug { "apply() '#{fei}' (#{fei.class})" }
if not exp
+
lwarn { "apply() cannot apply missing #{fei.to_debug_s}" }
return
+
+ #raise "apply() cannot apply missing #{fei.to_debug_s}"
end
#ldebug { "apply() #{fei.to_debug_s}" }
#exp.apply_time = OpenWFE::now()
@@ -780,10 +886,12 @@
exp, fei = fetch(exp)
ldebug { "reply() to #{fei.to_debug_s}" }
ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" }
+ check_if_paused exp
+
if not exp
#raise "cannot reply to missing #{fei.to_debug_s}"
lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" }
return
end
@@ -792,19 +900,108 @@
exp.reply(workitem)
end
#
+ # Will raise an exception if the expression belongs to a paused
+ # process.
+ #
+ def check_if_paused (expression)
+
+ return unless expression
+
+ raise PausedError.new(expression.fei.wfid) \
+ if expression.paused?
+ end
+
+ #
+ # if the launch method is called with a schedule option
+ # (like :at, :in, :cron and :every), this method takes care of
+ # wrapping the process with a sleep or a cron.
+ #
+ def wrap_in_schedule (raw_expression, options)
+
+ oat = options[:at]
+ oin = options[:in]
+ ocron = options[:cron]
+ oevery = options[:every]
+
+ fei = new_fei(nil, "schedlaunch", "0", "sequence")
+
+ # not very happy with this code, it builds custom
+ # wrapping processes manually, maybe there is
+ # a more elegant way, but for now, it's ok.
+
+ if oat or oin
+
+ seq = get_expression_map.get_class(:sequence)
+ seq = seq.new(fei, nil, nil, application_context, nil)
+
+ att = if oat
+ { "until" => oat }
+ else #oin
+ { "for" => oin }
+ end
+
+ sle = get_expression_map.get_class(:sleep)
+ sle = sle.new(fei.dup, fei, nil, application_context, att)
+ sle.fei.expression_id = "0.1"
+ sle.fei.expression_name = "sleep"
+ seq.children << sle.fei
+ seq.children << raw_expression.fei
+
+ seq.new_environment
+ sle.environment_id = seq.environment_id
+
+ sle.store_itself
+ seq.store_itself
+
+ raw_expression.store_itself
+ raw_expression = seq
+
+ elsif ocron or oevery
+
+ fei.expression_name = "cron"
+
+ att = if ocron
+ { "tab" => ocron }
+ else #oevery
+ { "every" => oevery }
+ end
+ att["name"] = "//cron_launch__#{fei.wfid}"
+
+ cro = get_expression_map.get_class(:cron)
+ cro = cro.new(fei, nil, nil, application_context, att)
+
+ cro.children << raw_expression.fei
+
+ cro.new_environment
+
+ cro.store_itself
+
+ raw_expression.store_itself
+ raw_expression = cro
+ end
+ # else, don't schedule at all
+
+ raw_expression
+ end
+
+ #
# Removes an environment, especially takes care of unbinding
# any special value it may contain.
#
def remove_environment (environment_id)
ldebug { "remove_environment() #{environment_id.to_debug_s}" }
env, fei = fetch(environment_id)
+ return unless env
+ #
+ # env already unbound and removed
+
env.unbind
#get_expression_storage().delete(environment_id)
onotify :remove, environment_id
@@ -889,20 +1086,16 @@
# something that was dumped via YAML
#
# else it's some ruby code to eval
- o = OpenWFE::eval_safely(param, SAFETY_LEVEL)
-
- return o.do_make \
- if o.is_a?(ProcessDefinition) or o.is_a?(Class)
-
- o
+ ProcessDefinition::eval_ruby_process_definition(
+ param, SAFETY_LEVEL)
end
#
- # Builds a FlowExpressionId instance for process being
+ # Builds a FlowExpressionId instance for a process being
# launched.
#
def new_fei (launchitem, flow_name, flow_revision, exp_name)
url = if launchitem
@@ -952,10 +1145,45 @@
fei, nil, nil, @application_context, procdef)
end
end
#
+ # This error is raised when an expression belonging to a paused
+ # process is applied or replied to.
+ #
+ class PausedError < RuntimeError
+
+ attr_reader :wfid
+
+ def initialize (wfid)
+
+ super "process '#{wfid}' is paused"
+ @wfid = wfid
+ end
+
+ #
+ # Returns a hash for this PausedError instance.
+ # (simply returns the hash of the paused process' wfid).
+ #
+ def hash
+
+ @wfid.hash
+ end
+
+ #
+ # Returns true if the other is a PausedError issued for the
+ # same process instance (wfid).
+ #
+ def == (other)
+
+ return false unless other.is_a?(PausedError)
+
+ (@wfid == other.wfid)
+ end
+ end
+
+ #
# a small help class for storing monitors provided on demand
# to expressions that need them
#
class MonitorProvider
include MonitorMixin, Logging
@@ -970,18 +1198,9 @@
def [] (key)
synchronize do
(@monitors[key] ||= Monitor.new)
- #if not mon
- # #ldebug { "[] creating new Monitor for #{key}" }
- # mon = Monitor.new
- # @monitors[key] = mon
- #else
- # #ldebug { "[] already had Monitor for #{key}" }
- #end
- #
- #mon
end
end
def delete (key)
synchronize do