lib/openwfe/engine/engine.rb in openwferu-0.9.11 vs lib/openwfe/engine/engine.rb in openwferu-0.9.12
- old
+ new
@@ -41,19 +41,22 @@
#
require 'logger'
require 'fileutils'
-require 'openwfe/workitem'
+require 'openwfe/omixins'
require 'openwfe/rudefinitions'
require 'openwfe/service'
+require 'openwfe/workitem'
require 'openwfe/util/irb'
require 'openwfe/util/scheduler'
require 'openwfe/util/schedulers'
require 'openwfe/expool/wfidgen'
require 'openwfe/expool/expressionpool'
require 'openwfe/expool/expstorage'
+require 'openwfe/expool/errorjournal'
+require 'openwfe/expressions/environment'
require 'openwfe/expressions/expressionmap'
require 'openwfe/participants/participantmap'
module OpenWFE
@@ -62,10 +65,11 @@
# The simplest implementation of the OpenWFE workflow engine.
# No persistence is used, everything is stored in memory.
#
class Engine < Service
include OwfeServiceLocator
+ include FeiMixin
#
# Builds an OpenWFEru engine.
#
# Accepts an optional initial application_context (containing
@@ -80,29 +84,57 @@
super(S_ENGINE, application_context)
$OWFE_LOG = application_context[:logger]
unless $OWFE_LOG
+ #puts "Creating logs in " + FileUtils.pwd
FileUtils.mkdir("logs") unless File.exist?("logs")
- $OWFE_LOG = Logger.new("logs/openwferu.log")
+ $OWFE_LOG = Logger.new("logs/openwferu.log", 10, 1024000)
+ $OWFE_LOG.level = Logger::INFO
end
# build order matters.
#
# especially for the expstorage which 'observes' the expression
# pool and thus needs to be instantiated after it.
build_scheduler()
+ #
+ # for delayed or repetitive executions (it's the engine's clock)
+ # see http://openwferu.rubyforge.org/scheduler.html
build_expression_map()
+ #
+ # mapping expression names ('sequence', 'if', 'concurrence',
+ # 'when'...) to their implementations (SequenceExpression,
+ # IfExpression, ConcurrenceExpression, ...)
build_wfid_generator()
+ #
+ # the workflow instance (process instance) id generator
+ # making sure each process instance has a unique identifier
+
build_expression_pool()
+ #
+ # the core (hairy ball) of the engine
+
build_expression_storage()
+ #
+ # the engine persistence (persisting the expression instances
+ # that make up process instances)
build_participant_map()
+ #
+ # building the services that maps participant names to
+ # participant implementations / instances.
+ build_error_journal()
+ #
+ # builds the error journal (keeping track of failures
+ # in business process executions, and an opportunity to
+ # fix and replay)
+
linfo { "new() --- engine started --- #{self.object_id}" }
end
#
# Call this method once the participant for a persisted engine
@@ -134,10 +166,14 @@
# Launches a [business] process.
# The 'launch_object' param may contain either a LaunchItem instance,
# either a String containing the URL of the process definition
# to launch (with an empty LaunchItem created on the fly).
#
+ # The launch object can also be a String containing the XML process
+ # definition or directly a class extending OpenWFE::ProcessDefinition
+ # (Ruby process definition).
+ #
# Returns the FlowExpressionId instance of the expression at the
# root of the newly launched process.
#
def launch (launch_object)
@@ -161,11 +197,16 @@
end
li
end
- get_expression_pool.launch launchitem
+ fei = get_expression_pool.launch launchitem
+
+ fei.dup
+ #
+ # so that users of this launch() method can play with their
+ # fei without breaking things
end
#
# This method is used to feed a workitem back to the engine (after
# it got sent to a worklist or wherever by a participant).
@@ -291,10 +332,25 @@
get_scheduler.join
end
#
+ # Calling this method makes the control flow block until the
+ # workflow engine is inactive.
+ #
+ # TODO : implement idle_for
+ #
+ def join_until_idle ()
+
+ storage = get_expression_storage
+
+ while storage.size > 1
+ sleep 1
+ end
+ end
+
+ #
# Enabling the console means that hitting CTRL-C on the window /
# term / dos box / whatever does run the OpenWFEru engine will
# open an IRB interactive console for directly manipulating the
# engine instance.
#
@@ -344,129 +400,176 @@
nil
end
#
+ # Waits for a given process instance to terminate.
+ # The method only exits when the flow terminates, but beware : if
+ # the process already terminated, the method will never exit.
+ #
+ # The parameter can be a FlowExpressionId instance, for example the
+ # one given back by a launch(), or directly a workflow instance id
+ # (String).
+ #
+ # This method is mainly used in utests.
+ #
+ def wait_for (fei_or_wfid)
+
+ wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
+ fei_or_wfid.workflow_instance_id
+ else
+ fei_or_wfid
+ end
+
+ #Thread.pass
+ # #
+ # # let the flow 'stabilize' or progress before enquiring
+ #fexp = get_expression_pool.fetch_expression(fei)
+ #return unless fexp
+ #
+ # doesn't work well
+
+ t = Thread.new { Thread.stop }
+
+ get_expression_pool.add_observer(:terminate) do |channel, fe, wi|
+ t.wakeup if fe.fei.workflow_instance_id == wfid
+ end
+
+ ldebug { "wait_for() #{wfid}" }
+
+ t.join
+ end
+
+ #
+ # Returns a hash of ProcessStatus instances. The keys of the hash
+ # are workflow instance ids.
+ #
+ # A ProcessStatus is a description of the state of a process instance
+ # it enumerates the expressions where the process is currently
+ # located (waiting certainly) and the errors the process currently
+ # has (hopefully none).
+ #
+ def get_process_status (wfid=nil)
+
+ wfid = to_wfid(wfid) if wfid
+
+ result = {}
+
+ get_expression_storage.real_each(wfid) do |fei, fexp|
+ next if fexp.kind_of?(Environment)
+ next unless fexp.apply_time
+ (result[fei.parent_wfid] ||= ProcessStatus.new) << fexp
+ end
+
+ result.values.each do |ps|
+ get_error_journal.get_error_log(ps.wfid).each do |error|
+ ps << error
+ end
+ end
+
+ class << result
+ def to_s
+ pretty_print_process_status(self)
+ end
+ end
+
+ result
+ end
+
+ #--
# METHODS FROM THE EXPRESSION POOL
#
# These methods are 'proxy' to method found in the expression pool.
# They are made available here for a simpler model.
- #
+ #++
#
# Returns the list of applied expressions belonging to a given
# workflow instance.
# May be used to determine where a process instance currently is.
#
- def get_flow_position (workflow_instance_id)
- get_expression_pool.get_flow_position(workflow_instance_id)
+ # This method returns all the expressions (the stack) a process
+ # went through to reach its current state.
+ #
+ def get_process_stack (workflow_instance_id)
+
+ get_expression_pool.get_process_stack(workflow_instance_id)
end
- alias :get_process_position :get_flow_position
+ alias :get_flow_stack :get_process_stack
#
- # Lists all workflows (processes) currently in the expool (in
+ # Lists all workflow (process) instances currently in the expool (in
# the engine).
# This method will return a list of "process-definition" expressions
# (root of flows).
#
- # If consider_subprocesses is set to true, "process-definition"
- # expressions of subprocesses will be returned as well.
- #
# "wfid_prefix" allows your to query for specific workflow instance
# id prefixes.
#
- def list_workflows (consider_subprocesses=false, wfid_prefix=nil)
+ # If consider_subprocesses is set to true, "process-definition"
+ # expressions of subprocesses will be returned as well.
+ #
+ def list_processes (consider_subprocesses=false, wfid_prefix=nil)
- get_expression_pool.list_workflows(
+ get_expression_pool.list_processes(
consider_subprocesses, wfid_prefix)
end
- alias :list_processes :list_workflows
+ alias :list_workflows :list_processes
#
# Given any expression of a process, cancels the complete process
# instance.
#
- def cancel_flow (exp_or_wfid)
- get_expression_pool.cancel_flow(exp_or_wfid)
+ def cancel_process (exp_or_wfid)
+
+ get_expression_pool.cancel_process(exp_or_wfid)
end
- alias :cancel_process :cancel_flow
+ alias :cancel_flow :cancel_process
#
# Cancels the given expression (and its children if any)
# (warning : advanced method)
#
+ # Cancelling the root expression of a process is equivalent to
+ # cancelling the process.
+ #
def cancel_expression (exp_or_fei)
- get_expression_pool.cancel(exp_or_fei)
+
+ get_expression_pool.cancel_expression(exp_or_fei)
end
#
# Forgets the given expression (make it an orphan)
# (warning : advanced method)
#
def forget_expression (exp_or_fei)
+
get_expression_pool.forget(exp_or_fei)
end
- #
- # Waits for a given process instance to terminate.
- # The method only exits when the flow terminates, but beware : if
- # the process already terminated, the method will never exit.
- #
- # The parameter can be a FlowExpressionId instance, for example the
- # one given back by a launch(), or directly a workflow instance id
- # (String).
- #
- # This method is mainly used in utests.
- #
- def wait_for (fei_or_wfid)
-
- wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
- fei_or_wfid.workflow_instance_id
- else
- fei_or_wfid
- end
-
- #Thread.pass
- # #
- # # let the flow 'stabilize' or progress before enquiring
- #fexp = get_expression_pool.fetch_expression(fei)
- #return unless fexp
- #
- # doesn't work well
-
- t = Thread.new { Thread.stop }
-
- get_expression_pool.add_observer(:terminate) do |channel, fe, wi|
- t.wakeup if fe.fei.workflow_instance_id == wfid
- end
-
- ldebug { "wait_for() #{wfid}" }
-
- t.join
- end
-
protected
- #
+ #--
# the following methods may get overridden upon extension
# see for example file_persisted_engine.rb
- #
+ #++
def build_expression_map
@application_context[S_EXPRESSION_MAP] = ExpressionMap.new
#
# the expression map is not a Service anymore,
# it's a simple instance (that will be reused in other
# OpenWFEru components)
-
- #ldebug do
- # "build_expression_map() :\n" +
- # get_expression_map.to_s
- #end
end
+ #
+ # This implementation builds a KotobaWfidGenerator instance and
+ # binds it in the engine context.
+ # There are other WfidGeneration implementations available, like
+ # UuidWfidGenerator or FieldWfidGenerator.
+ #
def build_wfid_generator
#init_service S_WFID_GENERATOR, DefaultWfidGenerator
#init_service S_WFID_GENERATOR, UuidWfidGenerator
init_service S_WFID_GENERATOR, KotobaWfidGenerator
@@ -477,29 +580,199 @@
# showing how to initialize a FieldWfidGenerator that
# will take as workflow instance id the value found in
# the field "wfid" of the LaunchItem.
end
+ #
+ # Builds the OpenWFEru expression pool (the core of the engine)
+ # and binds it in the engine context.
+ # There is only one implementation of the expression pool, so
+ # this method is usually never overriden.
+ #
def build_expression_pool
init_service(S_EXPRESSION_POOL, ExpressionPool)
end
+ #
+ # The implementation here builds an InMemoryExpressionStorage
+ # instance.
+ #
+ # See FilePersistedEngine or CachedFilePersistedEngine for
+ # overrides of this method.
+ #
def build_expression_storage
init_service(S_EXPRESSION_STORAGE, InMemoryExpressionStorage)
end
+ #
+ # The ParticipantMap is a mapping between participant names
+ # (well rather regular expressions) and participant implementations
+ # (see http://openwferu.rubyforge.org/participants.html)
+ #
def build_participant_map
init_service(S_PARTICIPANT_MAP, ParticipantMap)
end
+ #
+ # There is only one Scheduler implementation, that's the one
+ # built and bound here.
+ #
def build_scheduler
init_service(S_SCHEDULER, SchedulerService)
end
+ #
+ # The default implementation of this method uses an
+ # InMemoryErrorJournal (do not use in production).
+ #
+ def build_error_journal
+
+ init_service(S_ERROR_JOURNAL, InMemoryErrorJournal)
+ end
+
+ end
+
+ #
+ # ProcessStatus gathers information about the status of a business
+ # process instance.
+ #
+ # The status is mainly a list of expressions and a hash of errors.
+ #
+ # Instances of this class are obtained via Engine.process_status().
+ #
+ class ProcessStatus
+
+ #
+ # the String workflow instance id of the Process.
+ #
+ attr_reader :wfid
+
+ #
+ # The list of the expressions currently active in the process instance.
+ #
+ # For instance, if your process definition is currently in a
+ # concurrence, more than one expressions may be listed here.
+ #
+ attr_reader :expressions
+
+ #
+ # a hash whose values are ProcessError instances, the keys
+ # are FlowExpressionId instances (fei) (identifying the expressions
+ # that are concerned with the error)
+ #
+ attr_reader :errors
+
+ def initialize
+ @wfid = nil
+ @expressions = []
+ @errors = {}
+ end
+
+ #
+ # this method is used by Engine.get_process_status() when
+ # it prepares its results.
+ #
+ def << (item)
+
+ if item.kind_of?(FlowExpression)
+ add_expression item
+ else
+ add_error item
+ end
+ end
+
+ #
+ # A String representation, handy for debugging, quick viewing.
+ #
+ def to_s
+ s = ""
+ s << "-- #{self.class.name} --\n"
+ s << " wfid : #{@wfid}\n"
+ s << " expressions :\n"
+ @expressions.each do |fexp|
+ s << " #{fexp.fei}\n"
+ end
+ s << " errors : #{@errors.size}"
+ s
+ end
+
+ protected
+
+ def add_expression (fexp)
+
+ set_wfid fexp.fei.parent_wfid
+
+ #@expressions << fexp
+
+ exps = @expressions
+ @expressions = []
+
+ added = false
+ @expressions = exps.collect do |fe|
+ if added or fe.fei.wfid != fexp.fei.wfid
+ fe
+ else
+ if OpenWFE::starts_with(fexp.fei.expid, fe.fei.expid)
+ added = true
+ fexp
+ elsif OpenWFE::starts_with(fe.fei.expid, fexp.fei.expid)
+ added = true
+ fe
+ else
+ fe
+ end
+ end
+ end
+ @expressions << fexp unless added
+ end
+
+ def add_error (error)
+ @errors[error.fei] = error
+ end
+
+ def set_wfid (wfid)
+
+ return if @wfid
+ @wfid = wfid
+ end
+ end
+
+ #
+ # Renders a nice, terminal oriented, representation of an
+ # Engine.get_process_status() result.
+ #
+ # You usually directly benefit from this when doing
+ #
+ # puts engine.get_process_status.to_s
+ #
+ def pretty_print_process_status (ps)
+
+ s = ""
+ s << "process_id | name | rev | brn | err\n"
+ s << "--------------------+-------------------+---------+-----+-----\n"
+
+ ps.keys.sort.each do |wfid|
+
+ status = ps[wfid]
+ fexp = status.expressions[0]
+ ffei = fexp.fei
+
+ s << "%-19s" % wfid[0, 19]
+ s << " | "
+ s << "%-17s" % ffei.workflow_definition_name[0, 17]
+ s << " | "
+ s << "%-7s" % ffei.workflow_definition_revision[0, 7]
+ s << " | "
+ s << "%3s" % status.expressions.size.to_s[0, 3]
+ s << " | "
+ s << "%3s" % status.errors.size.to_s[0, 3]
+ s << "\n"
+ end
+ s
end
end