lib/openwfe/engine/engine.rb in openwferu-0.9.13 vs lib/openwfe/engine/engine.rb in openwferu-0.9.14
- old
+ new
@@ -141,10 +141,11 @@
# If this method is called too soon, missing participants will
# cause trouble... Call this method after all the participants
# have been added.
#
def reschedule
+
get_expression_pool.reschedule()
end
alias :reload :reschedule
@@ -156,10 +157,11 @@
#
# Note that the launch method will raise those exceptions as well.
# This method can be useful in some scenarii though.
#
def pre_launch_check (launchitem)
+
get_expression_pool.prepare_raw_expression(launchitem)
end
#
# Launches a [business] process.
@@ -172,36 +174,33 @@
# (Ruby process definition).
#
# Returns the FlowExpressionId instance of the expression at the
# root of the newly launched process.
#
- def launch (launch_object)
+ # Options for scheduled launches like :at, :in and :cron are accepted
+ # via the 'options' optional parameter.
+ # For example :
+ #
+ # engine.launch(launch_item)
+ # # will launch immediately
+ #
+ # engine.launch(launch_item, :in => "1d20m")
+ # # will launch in one day and twenty minutes
+ #
+ # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
+ # # will launch at that point in time
+ #
+ # engine.launch(launch_item, :cron => "0 5 * * *")
+ # # will launch that same process every day,
+ # # five minutes after midnight (see "man 5 crontab")
+ #
+ def launch (launch_object, options={})
- launchitem = if launch_object.kind_of?(OpenWFE::LaunchItem)
+ launchitem = extract_launchitem launch_object
- launch_object
+ fei = get_expression_pool.launch launchitem, options
- elsif launch_object.kind_of?(Class)
-
- LaunchItem.new launch_object
-
- elsif launch_object.kind_of? String
-
- li = OpenWFE::LaunchItem.new
-
- if launch_object[0, 1] == '<'
- li.workflowDefinitionUrl = "field:__definition"
- li['definition'] = launch_object
- else
- li.workflowDefinitionUrl = launch_object
- end
-
- li
- end
-
- fei = get_expression_pool.launch launchitem
-
fei.dup
#
# so that users of this launch() method can play with their
# fei without breaking things
end
@@ -229,10 +228,13 @@
"engine.reply() " +
"cannot handle instances of #{workitem.class}"
end
end
+ alias :forward :reply
+ alias :proceed :reply
+
#
# Registers a participant in this [embedded] engine.
# This method is a shortcut to the ParticipantMap method
# with the same name.
#
@@ -420,22 +422,22 @@
end
t = Thread.new { Thread.stop }
to = get_expression_pool.add_observer(:terminate) do |c, fe, wi|
- t.wakeup if fe.fei.workflow_instance_id == wfid
+ t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?)
end
- te = get_expression_pool.add_observer(:error) do |c, fei, m, wi, se|
- t.wakeup if fei.parent_wfid == wfid
+ te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
+ t.wakeup if (fei.parent_wfid == wfid and t.alive?)
end
- ldebug { "wait_for() #{wfid}" }
+ linfo { "wait_for() #{wfid}" }
t.join
- get_expression_pool.remove_observer to, :terminate
- get_expression_pool.remove_observer te, :error
+ get_expression_pool.remove_observer(to, :terminate)
+ get_expression_pool.remove_observer(te, :error)
#
# it would work as well without specifying the channel,
# but it's thus a little bit faster
end
@@ -446,11 +448,11 @@
# A ProcessStatus is a description of the state of a process instance.
# It enumerates the expressions where the process is currently
# located (waiting certainly) and the errors the process currently
# has (hopefully none).
#
- def get_process_status (wfid=nil)
+ def list_process_status (wfid=nil)
wfid = to_wfid(wfid) if wfid
result = {}
@@ -473,10 +475,25 @@
end
result
end
+ #
+ # list_process_status() will be deprecated at release 1.0.0
+ #
+ alias :get_process_status :list_process_status
+
+ #
+ # Returns the process status of one given process instance.
+ #
+ def process_status (wfid)
+
+ wfid = to_wfid(wfid)
+
+ list_process_status(wfid).values[0]
+ end
+
#--
# METHODS FROM THE EXPRESSION POOL
#
# These methods are 'proxy' to method found in the expression pool.
# They are made available here for a simpler model.
@@ -544,10 +561,52 @@
def forget_expression (exp_or_fei)
get_expression_pool.forget(exp_or_fei)
end
+ #
+ # Pauses a process (sets its /__paused__ variable to true).
+ #
+ def pause_process (wfid)
+
+ get_expression_pool.pause_process(wfid)
+ end
+
+ #
+ # Restarts a process : removes its 'paused' flag (variable) and makes
+ # sure to 'replay' events (replies) that came for it while it was
+ # in pause.
+ #
+ def resume_process (wfid)
+
+ get_expression_pool.resume_process(wfid)
+ end
+
+ #
+ # Looks up a process variable in a process.
+ # If fei_or_wfid is not given, will simply look in the
+ # 'engine environment' (where the top level variables '//' do reside).
+ #
+ def lookup_variable (var_name, fei_or_wfid=nil)
+
+ return get_expression_pool.fetch_engine_environment[var_name] \
+ unless fei_or_wfid
+
+ exp = if fei_or_wfid.is_a?(String)
+
+ get_expression_pool.fetch_root(fei_or_wfid)
+
+ else
+
+ get_expression_pool.fetch_expression(fei_or_wfid)
+ end
+
+ raise "no expression found for '#{fei_or_wfid.to_s}'" unless exp
+
+ exp.lookup_variable var_name
+ end
+
protected
#--
# the following methods may get overridden upon extension
# see for example file_persisted_engine.rb
@@ -631,10 +690,42 @@
def build_error_journal
init_service(S_ERROR_JOURNAL, InMemoryErrorJournal)
end
+ #
+ # Turns the raw launch request info into a LaunchItem instance.
+ #
+ def extract_launchitem (launch_object)
+
+ if launch_object.kind_of?(OpenWFE::LaunchItem)
+
+ launch_object
+
+ elsif launch_object.kind_of?(Class)
+
+ LaunchItem.new launch_object
+
+ elsif launch_object.kind_of?(String)
+
+ li = OpenWFE::LaunchItem.new
+
+ #if launch_object[0, 1] == '<' or launch_object.match("\n")
+ if launch_object[0, 1] == '<' or launch_object.index("\n")
+
+ li.workflow_definition_url = "field:__definition"
+ li['__definition'] = launch_object
+
+ else
+
+ li.workflow_definition_url = launch_object
+ end
+
+ li
+ end
+ end
+
end
#
# ProcessStatus represents information about the status of a workflow
# process instance.
@@ -657,11 +748,11 @@
# concurrence, more than one expressions may be listed here.
#
attr_reader :expressions
#
- # a hash whose values are ProcessError instances, the keys
+ # A hash whose values are ProcessError instances, the keys
# are FlowExpressionId instances (fei) (identifying the expressions
# that are concerned with the error)
#
attr_reader :errors
@@ -670,10 +761,19 @@
@expressions = []
@errors = {}
end
#
+ # Returns true if the process is in pause.
+ #
+ def paused?
+
+ exp = @expressions[0]
+ exp != nil and exp.paused?
+ end
+
+ #
# this method is used by Engine.get_process_status() when
# it prepares its results.
#
def << (item)
@@ -693,11 +793,12 @@
s << " wfid : #{@wfid}\n"
s << " expressions :\n"
@expressions.each do |fexp|
s << " #{fexp.fei}\n"
end
- s << " errors : #{@errors.size}"
+ s << " errors : #{@errors.size}\n"
+ s << " paused : #{paused?}"
s
end
protected
@@ -749,12 +850,12 @@
# puts engine.get_process_status.to_s
#
def pretty_print_process_status (ps)
s = ""
- s << "process_id | name | rev | brn | err\n"
- s << "--------------------+-------------------+---------+-----+-----\n"
+ s << "process_id | name | rev | brn | err | paused? \n"
+ s << "--------------------+-------------------+---------+-----+-----+---------\n"
ps.keys.sort.each do |wfid|
status = ps[wfid]
fexp = status.expressions[0]
@@ -767,9 +868,11 @@
s << "%-7s" % ffei.workflow_definition_revision[0, 7]
s << " | "
s << "%3s" % status.expressions.size.to_s[0, 3]
s << " | "
s << "%3s" % status.errors.size.to_s[0, 3]
+ s << " | "
+ s << "%5s" % status.paused?.to_s
s << "\n"
end
s
end