lib/openwfe/engine/engine.rb in openwferu-0.9.16 vs lib/openwfe/engine/engine.rb in openwferu-0.9.17
- old
+ new
@@ -1,8 +1,8 @@
#
#--
-# Copyright (c) 2006-2007, John Mettraux, Nicolas Modrzyk OpenWFE.org
+# Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
@@ -39,21 +39,22 @@
#
require 'logger'
require 'fileutils'
+require 'rufus/scheduler' # gem 'rufus-scheduler'
+
require 'openwfe/omixins'
require 'openwfe/rudefinitions'
require 'openwfe/service'
require 'openwfe/workitem'
require 'openwfe/util/irb'
-require 'openwfe/util/scheduler'
-require 'openwfe/util/schedulers'
require 'openwfe/expool/wfidgen'
require 'openwfe/expool/expressionpool'
require 'openwfe/expool/expstorage'
require 'openwfe/expool/errorjournal'
+require 'openwfe/engine/process_status'
require 'openwfe/expressions/environment'
require 'openwfe/expressions/expressionmap'
require 'openwfe/participants/participantmap'
@@ -64,10 +65,11 @@
# No persistence is used, everything is stored in memory.
#
class Engine < Service
include OwfeServiceLocator
include FeiMixin
+ include StatusMixin
#
# Builds an OpenWFEru engine.
#
# Accepts an optional initial application_context (containing
@@ -77,11 +79,11 @@
# where all the log output for OpenWFEru should go.
# By default, this output goes to logs/openwferu.log
#
def initialize (application_context={})
- super(S_ENGINE, application_context)
+ super S_ENGINE, application_context
$OWFE_LOG = application_context[:logger]
unless $OWFE_LOG
#puts "Creating logs in " + FileUtils.pwd
@@ -264,17 +266,53 @@
#
# Registers a participant in this [embedded] engine.
# This method is a shortcut to the ParticipantMap method
# with the same name.
#
+ # engine.register_participant "user-.*", HashParticipant
+ #
+ # or
+ #
+ # engine.register_participant "user-.*" do |wi|
+ # puts "participant '#{wi.participant_name}' received a workitem"
+ # #
+ # # and did nothing with it
+ # # as a block participant implicitely returns the workitem
+ # # to the engine
+ # end
+ #
# Returns the participant instance.
#
+ # The participant parameter can be set to hash like in
+ #
+ # engine.register_participant(
+ # "alpha",
+ # { :participant => HashParticipant, :position => :first })
+ #
+ # or
+ #
+ # engine.register_participant("alpha", :position => :first) do
+ # puts "first !"
+ # end
+ #
+ # There are some times where you have to position a participant first
+ # (especially with the regex technique).
+ #
# see ParticipantMap#register_participant
#
def register_participant (regex, participant=nil, &block)
- get_participant_map.register_participant(regex, participant, &block)
+ #get_participant_map.register_participant(
+ # regex, participant, &block)
+
+ params = if participant.class == Hash
+ participant
+ else
+ { :participant => participant }
+ end
+
+ get_participant_map.register_participant regex, params, &block
end
#
# Given a participant name, returns the participant in charge
# of handling workitems for that name.
@@ -329,11 +367,11 @@
if freq
freq = freq.to_s.strip
- result = if Scheduler.is_cron_string(freq)
+ result = if Rufus::Scheduler.is_cron_string(freq)
get_scheduler.schedule(freq, listener)
else
get_scheduler.schedule_every(freq, listener)
@@ -366,11 +404,11 @@
# Calling this method makes the control flow block until the
# workflow engine is inactive.
#
# TODO : implement idle_for
#
- def join_until_idle ()
+ def join_until_idle
storage = get_expression_storage
while storage.size > 1
sleep 1
@@ -408,21 +446,21 @@
#
def stop
linfo { "stop() stopping engine '#{@service_name}'" }
- @application_context.each do |name, service|
+ @application_context.each do |sname, service|
- next if name == self.service_name
+ next if sname == self.service_name
- #service.stop if service.respond_to? :stop
+ #if service.kind_of?(ServiceMixin)
+ if service.respond_to?(:stop)
- if service.kind_of? ServiceMixin
service.stop
+
linfo do
- "stop() stopped service '#{service.service_name}' "+
- "(#{service.class})"
+ "stop() stopped service '#{sname}' (#{service.class})"
end
end
end
linfo { "stop() stopped engine '#{@service_name}'" }
@@ -455,73 +493,29 @@
t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?)
end
te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
t.wakeup if (fei.parent_wfid == wfid and t.alive?)
end
+ #tc = get_expression_pool.add_observer(:cancel) do |c, fe|
+ # if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?)
+ # sleep 0.500
+ # t.wakeup
+ # end
+ #end
linfo { "wait_for() #{wfid}" }
t.join
get_expression_pool.remove_observer(to, :terminate)
get_expression_pool.remove_observer(te, :error)
+ #get_expression_pool.remove_observer(tc, :cancel)
#
# it would work as well without specifying the channel,
# but it's thus a little bit faster
end
- #
- # Returns a hash of ProcessStatus instances. The keys of the hash
- # are workflow instance ids.
- #
- # A ProcessStatus is a description of the state of a process instance.
- # It enumerates the expressions where the process is currently
- # located (waiting certainly) and the errors the process currently
- # has (hopefully none).
- #
- def list_process_status (wfid=nil)
-
- wfid = to_wfid(wfid) if wfid
-
- result = {}
-
- get_expression_storage.real_each(wfid) do |fei, fexp|
- next if fexp.kind_of?(Environment)
- next unless fexp.apply_time
- (result[fei.parent_wfid] ||= ProcessStatus.new) << fexp
- end
-
- result.values.each do |ps|
- get_error_journal.get_error_log(ps.wfid).each do |error|
- ps << error
- end
- end
-
- class << result
- def to_s
- pretty_print_process_status(self)
- end
- end
-
- result
- end
-
- #
- # list_process_status() will be deprecated at release 1.0.0
- #
- alias :get_process_status :list_process_status
-
- #
- # Returns the process status of one given process instance.
- #
- def process_status (wfid)
-
- wfid = to_wfid(wfid)
-
- list_process_status(wfid).values[0]
- end
-
#--
# METHODS FROM THE EXPRESSION POOL
#
# These methods are 'proxy' to method found in the expression pool.
# They are made available here for a simpler model.
@@ -533,76 +527,98 @@
# May be used to determine where a process instance currently is.
#
# This method returns all the expressions (the stack) a process
# went through to reach its current state.
#
- def get_process_stack (workflow_instance_id)
+ # If the unapplied optional parameter is set to true, all the
+ # expressions (even those not yet applied) that compose the process
+ # instance will be returned.
+ #
+ def process_stack (workflow_instance_id, unapplied=false)
- get_expression_pool.get_process_stack(workflow_instance_id)
+ get_expression_pool.process_stack workflow_instance_id, unapplied
end
- alias :get_flow_stack :get_process_stack
+ alias :get_process_stack :process_stack
+ alias :get_flow_stack :process_stack
#
# Lists all workflow (process) instances currently in the expool (in
# the engine).
# This method will return a list of "process-definition" expressions
# (i.e. OpenWFE::DefineExpression objects -- each representing the root
# element of a flow).
#
- # consider_subprocesses :: if true, "process-definition" expressions of
- # subprocesses will be returned as well.
- # wfid_prefix :: allows your to query for specific workflow instance
- # id prefixes.
+ # :wfid ::
+ # will list only one process,
+ # <tt>:wfid => '20071208-gipijiwozo'</tt>
+ # :parent_wfid ::
+ # will list only one process, and its subprocesses,
+ # <tt>:parent_wfid => '20071208-gipijiwozo'</tt>
+ # :consider_subprocesses ::
+ # if true, "process-definition" expressions
+ # of subprocesses will be returned as well.
+ # :wfid_prefix ::
+ # allows your to query for specific workflow instance
+ # id prefixes. for example :
+ # <tt>:wfid_prefix => "200712"</tt>
+ # for the processes started in December.
+ # :wfname ::
+ # will return only the process instances who belongs to the given
+ # workflow [name].
+ # :wfrevision ::
+ # usued in conjuction with :wfname, returns only the process
+ # instances of a given workflow revision.
#
- def list_processes (consider_subprocesses=false, wfid_prefix=nil)
+ def list_processes (options={})
- get_expression_pool.list_processes(
- consider_subprocesses, wfid_prefix)
+ get_expression_pool.list_processes options
end
alias :list_workflows :list_processes
#
# Given any expression of a process, cancels the complete process
# instance.
#
def cancel_process (exp_or_wfid)
- get_expression_pool.cancel_process(exp_or_wfid)
+ get_expression_pool.cancel_process exp_or_wfid
end
alias :cancel_flow :cancel_process
+ alias :abort_process :cancel_process
#
# Cancels the given expression (and its children if any)
# (warning : advanced method)
#
# Cancelling the root expression of a process is equivalent to
# cancelling the process.
#
def cancel_expression (exp_or_fei)
- get_expression_pool.cancel_expression(exp_or_fei)
+ get_expression_pool.cancel_expression exp_or_fei
end
#
# Forgets the given expression (make it an orphan)
# (warning : advanced method)
#
def forget_expression (exp_or_fei)
- get_expression_pool.forget(exp_or_fei)
+ get_expression_pool.forget exp_or_fei
end
#
# Pauses a process (sets its /__paused__ variable to true).
#
def pause_process (wfid)
- wfid = extract_wfid(wfid)
+ wfid = extract_wfid wfid
- root_expression = get_expression_pool.fetch_root(wfid)
+ root_expression = get_expression_pool.fetch_root wfid
- root_expression.set_variable(VAR_PAUSED, true)
+ get_expression_pool.paused_instances[wfid] = true
+ root_expression.set_variable VAR_PAUSED, true
end
#
# Restarts a process : removes its 'paused' flag (variable) and makes
# sure to 'replay' events (replies) that came for it while it was
@@ -615,11 +631,12 @@
root_expression = get_expression_pool.fetch_root wfid
#
# remove 'paused' flag
- root_expression.unset_variable(VAR_PAUSED)
+ get_expression_pool.paused_instances.delete wfid
+ root_expression.unset_variable VAR_PAUSED
#
# replay
#
# select PausedError instances in separate list
@@ -661,22 +678,29 @@
def lookup_variable (var_name, fei_or_wfid=nil)
return get_expression_pool.fetch_engine_environment[var_name] \
unless fei_or_wfid
- exp = if fei_or_wfid.is_a?(String)
+ fetch_exp(fei_or_wfid).lookup_variable var_name
+ end
- get_expression_pool.fetch_root(fei_or_wfid)
+ #
+ # Returns the variables set for a process or an expression.
+ #
+ # If a process (wfid) is given, variables of the process environment
+ # will be returned, else variables in the environment valid for the
+ # expression (fei) will be returned.
+ #
+ # If nothing (or nil) is given, the variables set in the engine
+ # environment will be returned.
+ #
+ def get_variables (fei_or_wfid=nil)
- else
+ return get_expression_pool.fetch_engine_environment.variables \
+ unless fei_or_wfid
- get_expression_pool.fetch_expression(fei_or_wfid)
- end
-
- raise "no expression found for '#{fei_or_wfid.to_s}'" unless exp
-
- exp.lookup_variable var_name
+ fetch_exp(fei_or_wfid).get_environment.variables
end
#
# Returns an array of wfid (workflow instance ids) whose root
# environment containes the given variable
@@ -690,35 +714,99 @@
#
def lookup_processes (var_name, value=nil)
# TODO : maybe this would be better in the ExpressionPool
- result = []
-
regexp = if value
if value.is_a?(Regexp)
value
else
Regexp.compile(value.to_s)
end
else
nil
end
- get_expression_storage.each_of_kind(Environment) do |fei, env|
+ envs = get_expression_storage.find_expressions(
+ :include_classes => Environment)
+ envs = envs.find_all do |env|
val = env.variables[var_name]
+ (val and ((not regexp) or (regexp.match(val))))
+ end
+ envs.collect do |env|
+ env.fei.wfid
+ end
- next unless val
- next if regexp and (not regexp.match(val))
+ #envs.inject([]) do |r, env|
+ # val = env.variables[var_name]
+ # r << env.fei.wfid \
+ # if (val and ((not regexp) or (regexp.match(val))))
+ # r
+ #end
+ #
+ # seems slower...
+ end
- result.push env.fei.wfid
+ #
+ # Use only when doing "process gardening".
+ #
+ # This method updates an expression, the 'data' parameter is expected
+ # to be a hash. If the expression is an Environment, the variables
+ # will be merged with the ones found in the data param.
+ # If the expression is not an Environment, the data will be merged
+ # into the 'applied_workitem' if any.
+ #
+ # If the merge is not possible, an exception will be raised.
+ #
+ def update_expression_data (fei, data)
+
+ fexp = fetch_exp fei
+
+ original = if fexp.is_a?(Environment)
+
+ fexp.variables
+ else
+
+ fexp.applied_workitem.attributes
end
- result
+ original.merge! data
+
+ get_expression_pool.update fexp
end
+ #
+ # A variant of update_expression() that directly replaces
+ # the raw representation stored within a RawExpression.
+ #
+ # Useful for modifying [not yet reached] segments of processes.
+ #
+ def update_raw_expression (fei, representation)
+
+ fexp = fetch_exp fei
+
+ raise "cannot update already applied expression" \
+ unless fexp.is_a?(RawExpression)
+
+ fexp.raw_representation = representation
+
+ get_expression_pool.update fexp
+ end
+
+ #
+ # Replaces an expression in the pool with a newer version of it.
+ #
+ # (useful when fixing processes on the fly)
+ #
+ def update_expression (fexp)
+
+ fexp.application_context = application_context
+
+ get_expression_pool.update fexp
+ end
+
protected
#--
# the following methods may get overridden upon extension
# see for example file_persisted_engine.rb
@@ -790,11 +878,15 @@
# There is only one Scheduler implementation, that's the one
# built and bound here.
#
def build_scheduler
- init_service S_SCHEDULER, SchedulerService
+ scheduler = Rufus::Scheduler.new
+
+ @application_context[S_SCHEDULER] = scheduler
+
+ scheduler.start
end
#
# The default implementation of this method uses an
# InMemoryErrorJournal (do not use in production).
@@ -819,11 +911,10 @@
elsif launch_object.kind_of?(String)
li = OpenWFE::LaunchItem.new
- #if launch_object[0, 1] == '<' or launch_object.match("\n")
if launch_object[0, 1] == '<' or launch_object.index("\n")
li.workflow_definition_url = "field:__definition"
li['__definition'] = launch_object
@@ -834,166 +925,26 @@
li
end
end
- end
+ #
+ # In case of wfid, returns the root expression of the process,
+ # in case of fei, returns the expression itself.
+ #
+ def fetch_exp (fei_or_wfid)
- #
- # ProcessStatus represents information about the status of a workflow
- # process instance.
- #
- # The status is mainly a list of expressions and a hash of errors.
- #
- # Instances of this class are obtained via Engine.process_status().
- #
- class ProcessStatus
+ exp = if fei_or_wfid.is_a?(String)
- #
- # the String workflow instance id of the Process.
- #
- attr_reader :wfid
+ get_expression_pool.fetch_root fei_or_wfid
- #
- # The list of the expressions currently active in the process instance.
- #
- # For instance, if your process definition is currently in a
- # concurrence, more than one expressions may be listed here.
- #
- attr_reader :expressions
+ else
- #
- # A hash whose values are ProcessError instances, the keys
- # are FlowExpressionId instances (fei) (identifying the expressions
- # that are concerned with the error)
- #
- attr_reader :errors
-
- #
- # The time at which the process got launched.
- #
- attr_reader :launch_time
-
- def initialize
- @wfid = nil
- @expressions = []
- @errors = {}
- @launch_time = nil
- end
-
- #
- # Returns true if the process is in pause.
- #
- def paused?
-
- exp = @expressions[0]
- exp != nil and exp.paused?
- end
-
- #
- # this method is used by Engine.get_process_status() when
- # it prepares its results.
- #
- def << (item)
-
- if item.kind_of?(FlowExpression)
- add_expression item
- else
- add_error item
- end
- end
-
- #
- # A String representation, handy for debugging, quick viewing.
- #
- def to_s
- s = ""
- s << "-- #{self.class.name} --\n"
- s << " wfid : #{@wfid}\n"
- s << " expressions :\n"
- @expressions.each do |fexp|
- s << " #{fexp.fei}\n"
- end
- s << " errors : #{@errors.size}\n"
- s << " paused : #{paused?}"
- s
- end
-
- protected
-
- def add_expression (fexp)
-
- set_wfid fexp.fei.parent_wfid
-
- @launch_time = fexp.apply_time if fexp.fei.expid == '0'
-
- exps = @expressions
- @expressions = []
-
- added = false
- @expressions = exps.collect do |fe|
- if added or fe.fei.wfid != fexp.fei.wfid
- fe
- else
- if OpenWFE::starts_with(fexp.fei.expid, fe.fei.expid)
- added = true
- fexp
- elsif OpenWFE::starts_with(fe.fei.expid, fexp.fei.expid)
- added = true
- fe
- else
- fe
- end
- end
+ get_expression_pool.fetch_expression fei_or_wfid
end
- @expressions << fexp unless added
- end
- def add_error (error)
- @errors[error.fei] = error
+ exp or raise "no expression found for '#{fei_or_wfid.to_s}'"
end
-
- def set_wfid (wfid)
-
- return if @wfid
- @wfid = wfid
- end
- end
-
- #
- # Renders a nice, terminal oriented, representation of an
- # Engine.get_process_status() result.
- #
- # You usually directly benefit from this when doing
- #
- # puts engine.get_process_status.to_s
- #
- def pretty_print_process_status (ps)
-
- s = ""
- s << "process_id | name | rev | brn | err | paused? \n"
- s << "--------------------+-------------------+---------+-----+-----+---------\n"
-
- ps.keys.sort.each do |wfid|
-
- status = ps[wfid]
- fexp = status.expressions[0]
- ffei = fexp.fei
-
- s << "%-19s" % wfid[0, 19]
- s << " | "
- s << "%-17s" % ffei.workflow_definition_name[0, 17]
- s << " | "
- s << "%-7s" % ffei.workflow_definition_revision[0, 7]
- s << " | "
- s << "%3s" % status.expressions.size.to_s[0, 3]
- s << " | "
- s << "%3s" % status.errors.size.to_s[0, 3]
- s << " | "
- s << "%5s" % status.paused?.to_s
- s << "\n"
- end
- s
end
end