#
#--
# Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# . Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# . Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
# Nicolas Modrzyk at openwfe.org
#
require 'logger'
require 'fileutils'
require 'rufus/scheduler' # gem 'rufus-scheduler'
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
require 'openwfe/service'
require 'openwfe/workitem'
require 'openwfe/util/irb'
require 'openwfe/expool/wfidgen'
require 'openwfe/expool/expressionpool'
require 'openwfe/expool/expstorage'
require 'openwfe/expool/errorjournal'
require 'openwfe/engine/process_status'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/expressionmap'
require 'openwfe/participants/participantmap'
module OpenWFE
#
# The simplest implementation of the OpenWFE workflow engine.
# No persistence is used, everything is stored in memory.
#
class Engine < Service
include OwfeServiceLocator
include FeiMixin
include StatusMixin
#
# Builds an OpenWFEru engine.
#
# Accepts an optional initial application_context (containing
# initialization params for services for example).
#
# The engine itself uses one param :logger, used to define
# where all the log output for OpenWFEru should go.
# By default, this output goes to logs/openwferu.log
#
def initialize (application_context={})
super S_ENGINE, application_context
$OWFE_LOG = application_context[:logger]
unless $OWFE_LOG
#puts "Creating logs in " + FileUtils.pwd
FileUtils.mkdir("logs") unless File.exist?("logs")
$OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
$OWFE_LOG.level = Logger::INFO
end
# build order matters.
#
# especially for the expstorage which 'observes' the expression
# pool and thus needs to be instantiated after it.
build_scheduler
#
# for delayed or repetitive executions (it's the engine's clock)
# see http://openwferu.rubyforge.org/scheduler.html
build_expression_map
#
# mapping expression names ('sequence', 'if', 'concurrence',
# 'when'...) to their implementations (SequenceExpression,
# IfExpression, ConcurrenceExpression, ...)
build_wfid_generator
#
# the workflow instance (process instance) id generator
# making sure each process instance has a unique identifier
build_expression_pool
#
# the core (hairy ball) of the engine
build_expression_storage
#
# the engine persistence (persisting the expression instances
# that make up process instances)
build_participant_map
#
# building the services that maps participant names to
# participant implementations / instances.
build_error_journal
#
# builds the error journal (keeping track of failures
# in business process executions, and an opportunity to
# fix and replay)
linfo { "new() --- engine started --- #{self.object_id}" }
end
#
# Call this method once the participants for a persisted engine
# have been [re]added.
#
# If this method is called too soon, missing participants will
# cause trouble... Call this method after all the participants
# have been added.
#
def reschedule
get_expression_pool.reschedule()
end
alias :reload :reschedule
#
# When 'parameters' are used at the top of a process definition, this
# method can be used to assert a launchitem before launch.
# An expression will be raised if the parameters do not match the
# requirements.
#
# Note that the launch method will raise those exceptions as well.
# This method can be useful in some scenarii though.
#
def pre_launch_check (launchitem)
get_expression_pool.prepare_raw_expression(launchitem)
end
#
# Launches a [business] process.
# The 'launch_object' param may contain either a LaunchItem instance,
# either a String containing the URL of the process definition
# to launch (with an empty LaunchItem created on the fly).
#
# The launch object can also be a String containing the XML process
# definition or directly a class extending OpenWFE::ProcessDefinition
# (Ruby process definition).
#
# Returns the FlowExpressionId instance of the expression at the
# root of the newly launched process.
#
# Options for scheduled launches like :at, :in and :cron are accepted
# via the 'options' optional parameter.
# For example :
#
# engine.launch(launch_item)
# # will launch immediately
#
# engine.launch(launch_item, :in => "1d20m")
# # will launch in one day and twenty minutes
#
# engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
# # will launch at that point in time
#
# engine.launch(launch_item, :cron => "0 5 * * *")
# # will launch that same process every day,
# # five minutes after midnight (see "man 5 crontab")
#
def launch (launch_object, options={})
launchitem = extract_launchitem launch_object
fei = get_expression_pool.launch launchitem, options
fei.dup
#
# so that users of this launch() method can play with their
# fei without breaking things
end
#
# This method is used to feed a workitem back to the engine (after
# it got sent to a worklist or wherever by a participant).
# Participant implementations themselves do call this method usually.
#
# This method also accepts LaunchItem instances.
#
# Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem
# that don't belong to a process instance (ie whose flow_expression_id
# is nil). It will simply notify the participant_map of the reply
# for the given participant_name. If there is no participant_name
# specified for this orphan workitem, an exception will be raised.
#
def reply (workitem)
if workitem.is_a?(InFlowWorkItem)
if workitem.flow_expression_id
#
# vanilla case, workitem coming back
# (from listener probably)
return get_expression_pool.reply(
workitem.flow_expression_id, workitem)
end
if workitem.participant_name
#
# a workitem that doesn't belong to a process instance
# but bears a participant name.
# Notify, there may be something listening on
# this channel (see the 'listen' expression).
return get_participant_map.onotify(
workitem.participant_name, :reply, workitem)
end
raise \
"InFlowWorkitem doesn't belong to a process instance" +
" nor to a participant"
end
return get_expression_pool.launch(workitem) \
if workitem.is_a?(LaunchItem)
#
# launchitem coming from listener
# let's attempt to launch a new process instance
raise \
"engine.reply() " +
"cannot handle instances of #{workitem.class}"
end
alias :forward :reply
alias :proceed :reply
#
# Registers a participant in this [embedded] engine.
# This method is a shortcut to the ParticipantMap method
# with the same name.
#
# engine.register_participant "user-.*", HashParticipant
#
# or
#
# engine.register_participant "user-.*" do |wi|
# puts "participant '#{wi.participant_name}' received a workitem"
# #
# # and did nothing with it
# # as a block participant implicitely returns the workitem
# # to the engine
# end
#
# Returns the participant instance.
#
# The participant parameter can be set to hash like in
#
# engine.register_participant(
# "alpha",
# { :participant => HashParticipant, :position => :first })
#
# or
#
# engine.register_participant("alpha", :position => :first) do
# puts "first !"
# end
#
# There are some times where you have to position a participant first
# (especially with the regex technique).
#
# see ParticipantMap#register_participant
#
def register_participant (regex, participant=nil, &block)
#get_participant_map.register_participant(
# regex, participant, &block)
params = if participant.class == Hash
participant
else
{ :participant => participant }
end
get_participant_map.register_participant regex, params, &block
end
#
# Given a participant name, returns the participant in charge
# of handling workitems for that name.
# May be useful in some embedded contexts.
#
def get_participant (participant_name)
get_participant_map.lookup_participant participant_name
end
#
# Removes the first participant matching the given name from the
# participant map kept by the engine.
#
def unregister_participant (participant_name)
get_participant_map.unregister_participant participant_name
end
#
# Adds a workitem listener to this engine.
#
# The 'freq' parameters if present might indicate how frequently
# the resource should be polled for incoming workitems.
#
# engine.add_workitem_listener(listener, "3m10s")
# # every 3 minutes and 10 seconds
#
# engine.add_workitem_listener(listener, "0 22 * * 1-5")
# # every weekday at 10pm
#
# TODO : block handling...
#
def add_workitem_listener (listener, freq=nil)
name = nil
if listener.kind_of? Class
listener = init_service nil, listener
name = listener.service_name
else
name = listener.name if listener.respond_to? :name
name = "#{listener.class}::#{listener.object_id}" unless name
@application_context[name] = listener
end
result = nil
if freq
freq = freq.to_s.strip
result = if Rufus::Scheduler.is_cron_string(freq)
get_scheduler.schedule(freq, listener)
else
get_scheduler.schedule_every(freq, listener)
end
end
linfo { "add_workitem_listener() added '#{name}'" }
result
end
#
# Makes the current thread join the engine's scheduler thread
#
# You can thus make an engine standalone with something like :
#
# require 'openwfe/engine/engine'
#
# the_engine = OpenWFE::Engine.new
# the_engine.join
#
# And you'll have to hit CTRL-C to make it stop.
#
def join
get_scheduler.join
end
#
# Calling this method makes the control flow block until the
# workflow engine is inactive.
#
# TODO : implement idle_for
#
def join_until_idle
storage = get_expression_storage
while storage.size > 1
sleep 1
end
end
#
# Enabling the console means that hitting CTRL-C on the window /
# term / dos box / whatever does run the OpenWFEru engine will
# open an IRB interactive console for directly manipulating the
# engine instance.
#
# Hit CTRL-D to get out of the console.
#
def enable_irb_console
OpenWFE::trap_int_irb(binding)
end
#--
# Makes sure that hitting CTRL-C will actually kill the engine VM and
# not open an IRB console.
#
#def disable_irb_console
# $openwfe_irb = nil
# trap 'INT' do
# exit 0
# end
#end
#++
#
# Stopping the engine will stop all the services in the
# application context.
#
def stop
linfo { "stop() stopping engine '#{@service_name}'" }
@application_context.each do |sname, service|
next if sname == self.service_name
#if service.kind_of?(ServiceMixin)
if service.respond_to?(:stop)
service.stop
linfo do
"stop() stopped service '#{sname}' (#{service.class})"
end
end
end
linfo { "stop() stopped engine '#{@service_name}'" }
nil
end
#
# Waits for a given process instance to terminate.
# The method only exits when the flow terminates, but beware : if
# the process already terminated, the method will never exit.
#
# The parameter can be a FlowExpressionId instance, for example the
# one given back by a launch(), or directly a workflow instance id
# (String).
#
# This method is mainly used in utests.
#
def wait_for (fei_or_wfid)
wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
fei_or_wfid.workflow_instance_id
else
fei_or_wfid
end
t = Thread.new { Thread.stop }
to = get_expression_pool.add_observer(:terminate) do |c, fe, wi|
t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?)
end
te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
t.wakeup if (fei.parent_wfid == wfid and t.alive?)
end
#tc = get_expression_pool.add_observer(:cancel) do |c, fe|
# if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?)
# sleep 0.500
# t.wakeup
# end
#end
linfo { "wait_for() #{wfid}" }
t.join
get_expression_pool.remove_observer(to, :terminate)
get_expression_pool.remove_observer(te, :error)
#get_expression_pool.remove_observer(tc, :cancel)
#
# it would work as well without specifying the channel,
# but it's thus a little bit faster
end
#--
# METHODS FROM THE EXPRESSION POOL
#
# These methods are 'proxy' to method found in the expression pool.
# They are made available here for a simpler model.
#++
#
# Returns the list of applied expressions belonging to a given
# workflow instance.
# May be used to determine where a process instance currently is.
#
# This method returns all the expressions (the stack) a process
# went through to reach its current state.
#
# If the unapplied optional parameter is set to true, all the
# expressions (even those not yet applied) that compose the process
# instance will be returned.
#
def process_stack (workflow_instance_id, unapplied=false)
get_expression_pool.process_stack workflow_instance_id, unapplied
end
alias :get_process_stack :process_stack
alias :get_flow_stack :process_stack
#
# Lists all workflow (process) instances currently in the expool (in
# the engine).
# This method will return a list of "process-definition" expressions
# (i.e. OpenWFE::DefineExpression objects -- each representing the root
# element of a flow).
#
# :wfid ::
# will list only one process,
# :wfid => '20071208-gipijiwozo'
# :parent_wfid ::
# will list only one process, and its subprocesses,
# :parent_wfid => '20071208-gipijiwozo'
# :consider_subprocesses ::
# if true, "process-definition" expressions
# of subprocesses will be returned as well.
# :wfid_prefix ::
# allows your to query for specific workflow instance
# id prefixes. for example :
# :wfid_prefix => "200712"
# for the processes started in December.
# :wfname ::
# will return only the process instances who belongs to the given
# workflow [name].
# :wfrevision ::
# usued in conjuction with :wfname, returns only the process
# instances of a given workflow revision.
#
def list_processes (options={})
get_expression_pool.list_processes options
end
alias :list_workflows :list_processes
#
# Given any expression of a process, cancels the complete process
# instance.
#
def cancel_process (exp_or_wfid)
get_expression_pool.cancel_process exp_or_wfid
end
alias :cancel_flow :cancel_process
alias :abort_process :cancel_process
#
# Cancels the given expression (and its children if any)
# (warning : advanced method)
#
# Cancelling the root expression of a process is equivalent to
# cancelling the process.
#
def cancel_expression (exp_or_fei)
get_expression_pool.cancel_expression exp_or_fei
end
#
# Forgets the given expression (make it an orphan)
# (warning : advanced method)
#
def forget_expression (exp_or_fei)
get_expression_pool.forget exp_or_fei
end
#
# Pauses a process (sets its /__paused__ variable to true).
#
def pause_process (wfid)
wfid = extract_wfid wfid
root_expression = get_expression_pool.fetch_root wfid
get_expression_pool.paused_instances[wfid] = true
root_expression.set_variable VAR_PAUSED, true
end
#
# Restarts a process : removes its 'paused' flag (variable) and makes
# sure to 'replay' events (replies) that came for it while it was
# in pause.
#
def resume_process (wfid)
wfid = extract_wfid wfid
root_expression = get_expression_pool.fetch_root wfid
#
# remove 'paused' flag
get_expression_pool.paused_instances.delete wfid
root_expression.unset_variable VAR_PAUSED
#
# replay
#
# select PausedError instances in separate list
errors = get_error_journal.get_error_log wfid
error_class = PausedError.name
paused_errors = errors.select { |e| e.error_class == error_class }
return if paused_errors.size < 1
# replay select PausedError instances
paused_errors.each do |e|
replay_at_error e
end
end
#
# Takes care of removing an error from the error journal and
# they replays its process at that point.
#
def replay_at_error (error)
get_error_journal.remove_errors(
error.fei.parent_wfid,
error)
get_expression_pool.queue_work(
error.message,
error.fei,
error.workitem)
end
#
# Looks up a process variable in a process.
# If fei_or_wfid is not given, will simply look in the
# 'engine environment' (where the top level variables '//' do reside).
#
def lookup_variable (var_name, fei_or_wfid=nil)
return get_expression_pool.fetch_engine_environment[var_name] \
unless fei_or_wfid
fetch_exp(fei_or_wfid).lookup_variable var_name
end
#
# Returns the variables set for a process or an expression.
#
# If a process (wfid) is given, variables of the process environment
# will be returned, else variables in the environment valid for the
# expression (fei) will be returned.
#
# If nothing (or nil) is given, the variables set in the engine
# environment will be returned.
#
def get_variables (fei_or_wfid=nil)
return get_expression_pool.fetch_engine_environment.variables \
unless fei_or_wfid
fetch_exp(fei_or_wfid).get_environment.variables
end
#
# Returns an array of wfid (workflow instance ids) whose root
# environment containes the given variable
#
# If there are no matches, an empty array will be returned.
#
# Regular expressions are accepted as values.
#
# If no value is given, all processes with the given variable name
# set will be returned.
#
def lookup_processes (var_name, value=nil)
# TODO : maybe this would be better in the ExpressionPool
regexp = if value
if value.is_a?(Regexp)
value
else
Regexp.compile(value.to_s)
end
else
nil
end
envs = get_expression_storage.find_expressions(
:include_classes => Environment)
envs = envs.find_all do |env|
val = env.variables[var_name]
(val and ((not regexp) or (regexp.match(val))))
end
envs.collect do |env|
env.fei.wfid
end
#envs.inject([]) do |r, env|
# val = env.variables[var_name]
# r << env.fei.wfid \
# if (val and ((not regexp) or (regexp.match(val))))
# r
#end
#
# seems slower...
end
#
# Use only when doing "process gardening".
#
# This method updates an expression, the 'data' parameter is expected
# to be a hash. If the expression is an Environment, the variables
# will be merged with the ones found in the data param.
# If the expression is not an Environment, the data will be merged
# into the 'applied_workitem' if any.
#
# If the merge is not possible, an exception will be raised.
#
def update_expression_data (fei, data)
fexp = fetch_exp fei
original = if fexp.is_a?(Environment)
fexp.variables
else
fexp.applied_workitem.attributes
end
original.merge! data
get_expression_pool.update fexp
end
#
# A variant of update_expression() that directly replaces
# the raw representation stored within a RawExpression.
#
# Useful for modifying [not yet reached] segments of processes.
#
def update_raw_expression (fei, representation)
fexp = fetch_exp fei
raise "cannot update already applied expression" \
unless fexp.is_a?(RawExpression)
fexp.raw_representation = representation
get_expression_pool.update fexp
end
#
# Replaces an expression in the pool with a newer version of it.
#
# (useful when fixing processes on the fly)
#
def update_expression (fexp)
fexp.application_context = application_context
get_expression_pool.update fexp
end
protected
#--
# the following methods may get overridden upon extension
# see for example file_persisted_engine.rb
#++
def build_expression_map
@application_context[S_EXPRESSION_MAP] = ExpressionMap.new
#
# the expression map is not a Service anymore,
# it's a simple instance (that will be reused in other
# OpenWFEru components)
end
#
# This implementation builds a KotobaWfidGenerator instance and
# binds it in the engine context.
# There are other WfidGeneration implementations available, like
# UuidWfidGenerator or FieldWfidGenerator.
#
def build_wfid_generator
#init_service S_WFID_GENERATOR, DefaultWfidGenerator
#init_service S_WFID_GENERATOR, UuidWfidGenerator
init_service S_WFID_GENERATOR, KotobaWfidGenerator
#g = FieldWfidGenerator.new(
# S_WFID_GENERATOR, @application_context, "wfid")
#
# showing how to initialize a FieldWfidGenerator that
# will take as workflow instance id the value found in
# the field "wfid" of the LaunchItem.
end
#
# Builds the OpenWFEru expression pool (the core of the engine)
# and binds it in the engine context.
# There is only one implementation of the expression pool, so
# this method is usually never overriden.
#
def build_expression_pool
init_service S_EXPRESSION_POOL, ExpressionPool
end
#
# The implementation here builds an InMemoryExpressionStorage
# instance.
#
# See FilePersistedEngine or CachedFilePersistedEngine for
# overrides of this method.
#
def build_expression_storage
init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage
end
#
# The ParticipantMap is a mapping between participant names
# (well rather regular expressions) and participant implementations
# (see http://openwferu.rubyforge.org/participants.html)
#
def build_participant_map
init_service S_PARTICIPANT_MAP, ParticipantMap
end
#
# There is only one Scheduler implementation, that's the one
# built and bound here.
#
def build_scheduler
scheduler = Rufus::Scheduler.new
@application_context[S_SCHEDULER] = scheduler
scheduler.start
end
#
# The default implementation of this method uses an
# InMemoryErrorJournal (do not use in production).
#
def build_error_journal
init_service S_ERROR_JOURNAL, InMemoryErrorJournal
end
#
# Turns the raw launch request info into a LaunchItem instance.
#
def extract_launchitem (launch_object)
if launch_object.kind_of?(OpenWFE::LaunchItem)
launch_object
elsif launch_object.kind_of?(Class)
LaunchItem.new launch_object
elsif launch_object.kind_of?(String)
li = OpenWFE::LaunchItem.new
if launch_object[0, 1] == '<' or launch_object.index("\n")
li.workflow_definition_url = "field:__definition"
li['__definition'] = launch_object
else
li.workflow_definition_url = launch_object
end
li
end
end
#
# In case of wfid, returns the root expression of the process,
# in case of fei, returns the expression itself.
#
def fetch_exp (fei_or_wfid)
exp = if fei_or_wfid.is_a?(String)
get_expression_pool.fetch_root fei_or_wfid
else
get_expression_pool.fetch_expression fei_or_wfid
end
exp or raise "no expression found for '#{fei_or_wfid.to_s}'"
end
end
end