lib/openwfe/engine/engine.rb in openwferu-0.9.15 vs lib/openwfe/engine/engine.rb in openwferu-0.9.16
- old
+ new
@@ -84,50 +84,50 @@
$OWFE_LOG = application_context[:logger]
unless $OWFE_LOG
#puts "Creating logs in " + FileUtils.pwd
FileUtils.mkdir("logs") unless File.exist?("logs")
- $OWFE_LOG = Logger.new("logs/openwferu.log", 10, 1024000)
+ $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
$OWFE_LOG.level = Logger::INFO
end
# build order matters.
#
# especially for the expstorage which 'observes' the expression
# pool and thus needs to be instantiated after it.
- build_scheduler()
+ build_scheduler
#
# for delayed or repetitive executions (it's the engine's clock)
# see http://openwferu.rubyforge.org/scheduler.html
- build_expression_map()
+ build_expression_map
#
# mapping expression names ('sequence', 'if', 'concurrence',
# 'when'...) to their implementations (SequenceExpression,
# IfExpression, ConcurrenceExpression, ...)
- build_wfid_generator()
+ build_wfid_generator
#
# the workflow instance (process instance) id generator
# making sure each process instance has a unique identifier
- build_expression_pool()
+ build_expression_pool
#
# the core (hairy ball) of the engine
- build_expression_storage()
+ build_expression_storage
#
# the engine persistence (persisting the expression instances
# that make up process instances)
- build_participant_map()
+ build_participant_map
#
# building the services that maps participant names to
# participant implementations / instances.
- build_error_journal()
+ build_error_journal
#
# builds the error journal (keeping track of failures
# in business process executions, and an opportunity to
# fix and replay)
@@ -210,26 +210,54 @@
# it got sent to a worklist or wherever by a participant).
# Participant implementations themselves do call this method usually.
#
# This method also accepts LaunchItem instances.
#
+ # Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem
+ # that don't belong to a process instance (ie whose flow_expression_id
+ # is nil). It will simply notify the participant_map of the reply
+ # for the given participant_name. If there is no participant_name
+ # specified for this orphan workitem, an exception will be raised.
+ #
def reply (workitem)
- if workitem.kind_of?(InFlowWorkItem)
+ if workitem.is_a?(InFlowWorkItem)
- get_expression_pool.reply workitem.flow_expression_id, workitem
+ if workitem.flow_expression_id
+ #
+ # vanilla case, workitem coming back
+ # (from listener probably)
- elsif workitem.kind_of?(LaunchItem)
+ return get_expression_pool.reply(
+ workitem.flow_expression_id, workitem)
+ end
- get_expression_pool.launch workitem
+ if workitem.participant_name
+ #
+ # a workitem that doesn't belong to a process instance
+ # but bears a participant name.
+ # Notify, there may be something listening on
+ # this channel (see the 'listen' expression).
- else
+ return get_participant_map.onotify(
+ workitem.participant_name, :reply, workitem)
+ end
raise \
- "engine.reply() " +
- "cannot handle instances of #{workitem.class}"
+ "InFlowWorkitem doesn't belong to a process instance" +
+ " nor to a participant"
end
+
+ return get_expression_pool.launch(workitem) \
+ if workitem.is_a?(LaunchItem)
+ #
+ # launchitem coming from listener
+ # let's attempt to launch a new process instance
+
+ raise \
+ "engine.reply() " +
+ "cannot handle instances of #{workitem.class}"
end
alias :forward :reply
alias :proceed :reply
@@ -252,20 +280,20 @@
# of handling workitems for that name.
# May be useful in some embedded contexts.
#
def get_participant (participant_name)
- get_participant_map.lookup_participant(participant_name)
+ get_participant_map.lookup_participant participant_name
end
#
# Removes the first participant matching the given name from the
# participant map kept by the engine.
#
def unregister_participant (participant_name)
- get_participant_map.unregister_participant(participant_name)
+ get_participant_map.unregister_participant participant_name
end
#
# Adds a workitem listener to this engine.
#
@@ -566,24 +594,68 @@
#
# Pauses a process (sets its /__paused__ variable to true).
#
def pause_process (wfid)
- get_expression_pool.pause_process(wfid)
+ wfid = extract_wfid(wfid)
+
+ root_expression = get_expression_pool.fetch_root(wfid)
+
+ root_expression.set_variable(VAR_PAUSED, true)
end
#
# Restarts a process : removes its 'paused' flag (variable) and makes
# sure to 'replay' events (replies) that came for it while it was
# in pause.
#
def resume_process (wfid)
- get_expression_pool.resume_process(wfid)
+ wfid = extract_wfid wfid
+
+ root_expression = get_expression_pool.fetch_root wfid
+
+ #
+ # remove 'paused' flag
+
+ root_expression.unset_variable(VAR_PAUSED)
+
+ #
+ # replay
+ #
+ # select PausedError instances in separate list
+
+ errors = get_error_journal.get_error_log wfid
+ error_class = PausedError.name
+ paused_errors = errors.select { |e| e.error_class == error_class }
+
+ return if paused_errors.size < 1
+
+ # replay select PausedError instances
+
+ paused_errors.each do |e|
+ replay_at_error e
+ end
end
#
+ # Takes care of removing an error from the error journal and
+ # they replays its process at that point.
+ #
+ def replay_at_error (error)
+
+ get_error_journal.remove_errors(
+ error.fei.parent_wfid,
+ error)
+
+ get_expression_pool.queue_work(
+ error.message,
+ error.fei,
+ error.workitem)
+ end
+
+ #
# Looks up a process variable in a process.
# If fei_or_wfid is not given, will simply look in the
# 'engine environment' (where the top level variables '//' do reside).
#
def lookup_variable (var_name, fei_or_wfid=nil)
@@ -603,10 +675,50 @@
raise "no expression found for '#{fei_or_wfid.to_s}'" unless exp
exp.lookup_variable var_name
end
+ #
+ # Returns an array of wfid (workflow instance ids) whose root
+ # environment containes the given variable
+ #
+ # If there are no matches, an empty array will be returned.
+ #
+ # Regular expressions are accepted as values.
+ #
+ # If no value is given, all processes with the given variable name
+ # set will be returned.
+ #
+ def lookup_processes (var_name, value=nil)
+
+ # TODO : maybe this would be better in the ExpressionPool
+
+ result = []
+
+ regexp = if value
+ if value.is_a?(Regexp)
+ value
+ else
+ Regexp.compile(value.to_s)
+ end
+ else
+ nil
+ end
+
+ get_expression_storage.each_of_kind(Environment) do |fei, env|
+
+ val = env.variables[var_name]
+
+ next unless val
+ next if regexp and (not regexp.match(val))
+
+ result.push env.fei.wfid
+ end
+
+ result
+ end
+
protected
#--
# the following methods may get overridden upon extension
# see for example file_persisted_engine.rb
@@ -647,11 +759,11 @@
# There is only one implementation of the expression pool, so
# this method is usually never overriden.
#
def build_expression_pool
- init_service(S_EXPRESSION_POOL, ExpressionPool)
+ init_service S_EXPRESSION_POOL, ExpressionPool
end
#
# The implementation here builds an InMemoryExpressionStorage
# instance.
@@ -659,39 +771,39 @@
# See FilePersistedEngine or CachedFilePersistedEngine for
# overrides of this method.
#
def build_expression_storage
- init_service(S_EXPRESSION_STORAGE, InMemoryExpressionStorage)
+ init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage
end
#
# The ParticipantMap is a mapping between participant names
# (well rather regular expressions) and participant implementations
# (see http://openwferu.rubyforge.org/participants.html)
#
def build_participant_map
- init_service(S_PARTICIPANT_MAP, ParticipantMap)
+ init_service S_PARTICIPANT_MAP, ParticipantMap
end
#
# There is only one Scheduler implementation, that's the one
# built and bound here.
#
def build_scheduler
- init_service(S_SCHEDULER, SchedulerService)
+ init_service S_SCHEDULER, SchedulerService
end
#
# The default implementation of this method uses an
# InMemoryErrorJournal (do not use in production).
#
def build_error_journal
- init_service(S_ERROR_JOURNAL, InMemoryErrorJournal)
+ init_service S_ERROR_JOURNAL, InMemoryErrorJournal
end
#
# Turns the raw launch request info into a LaunchItem instance.
#
@@ -754,14 +866,20 @@
# are FlowExpressionId instances (fei) (identifying the expressions
# that are concerned with the error)
#
attr_reader :errors
+ #
+ # The time at which the process got launched.
+ #
+ attr_reader :launch_time
+
def initialize
@wfid = nil
@expressions = []
@errors = {}
+ @launch_time = nil
end
#
# Returns true if the process is in pause.
#
@@ -804,10 +922,10 @@
def add_expression (fexp)
set_wfid fexp.fei.parent_wfid
- #@expressions << fexp
+ @launch_time = fexp.apply_time if fexp.fei.expid == '0'
exps = @expressions
@expressions = []
added = false