lib/openwfe/engine/engine.rb in ruote-0.9.18 vs lib/openwfe/engine/engine.rb in ruote-0.9.19
- old
+ new
@@ -1,44 +1,44 @@
#
#--
# Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org
# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
+#
+# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
+# list of conditions and the following disclaimer.
+#
+# . Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
-#
+#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
# "made in Japan"
#
# John Mettraux at openwfe.org
# Nicolas Modrzyk at openwfe.org
-#
+#
require 'logger'
require 'fileutils'
require 'rufus/scheduler' # gem 'rufus-scheduler'
@@ -47,10 +47,12 @@
require 'openwfe/rudefinitions'
require 'openwfe/service'
require 'openwfe/workitem'
require 'openwfe/util/irb'
require 'openwfe/util/workqueue'
+require 'openwfe/util/treechecker'
+require 'openwfe/expool/parser'
require 'openwfe/expool/wfidgen'
require 'openwfe/expool/expressionpool'
require 'openwfe/expool/expstorage'
require 'openwfe/expool/errorjournal'
require 'openwfe/engine/expool_methods'
@@ -62,695 +64,657 @@
require 'openwfe/participants/participantmap'
module OpenWFE
- #
- # The simplest implementation of the OpenWFE workflow engine.
- # No persistence is used, everything is stored in memory.
- #
- class Engine < Service
+ #
+ # The simplest implementation of the OpenWFE workflow engine.
+ # No persistence is used, everything is stored in memory.
+ #
+ class Engine < Service
- include OwfeServiceLocator
- include FeiMixin
+ include OwfeServiceLocator
+ include FeiMixin
- include ExpoolMethods
- include StatusMethods
- include ParticipantMethods
- include UpdateExpMethods
+ include ExpoolMethods
+ include StatusMethods
+ include ParticipantMethods
+ include UpdateExpMethods
- #
- # The name of the engine, will be used to 'stamp' each expression
- # active in the engine (and thus indirectrly, each workitem)
- #
- attr_reader :engine_name
+ #
+ # The name of the engine, will be used to 'stamp' each expression
+ # active in the engine (and thus indirectrly, each workitem)
+ #
+ attr_reader :engine_name
- #
- # Builds an OpenWFEru engine.
- #
- # Accepts an optional initial application_context (containing
- # initialization params for services for example).
- #
- # The engine itself uses one param :logger, used to define
- # where all the log output for OpenWFEru should go.
- # By default, this output goes to logs/openwferu.log
- #
- def initialize (application_context={})
+ #
+ # Builds an OpenWFEru engine.
+ #
+ # Accepts an optional initial application_context (containing
+ # initialization params for services for example).
+ #
+ # The engine itself uses one param :logger, used to define
+ # where all the log output for OpenWFEru should go.
+ # By default, this output goes to logs/openwferu.log
+ #
+ def initialize (application_context={})
- super S_ENGINE, application_context
+ super :s_engine, application_context
- @engine_name = application_context[:engine_name] || 'engine'
+ @engine_name = application_context[:engine_name] || 'engine'
- $OWFE_LOG = application_context[:logger]
+ $OWFE_LOG = application_context[:logger]
- unless $OWFE_LOG
- #puts "Creating logs in " + FileUtils.pwd
- FileUtils.mkdir("logs") unless File.exist?("logs")
- $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
- $OWFE_LOG.level = Logger::INFO
- end
+ unless $OWFE_LOG
+ #puts "Creating logs in " + FileUtils.pwd
+ FileUtils.mkdir("logs") unless File.exist?("logs")
+ $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
+ $OWFE_LOG.level = Logger::INFO
+ end
- # build order matters.
- #
- # especially for the expstorage which 'observes' the expression
- # pool and thus needs to be instantiated after it.
+ # build order matters.
+ #
+ # especially for the expstorage which 'observes' the expression
+ # pool and thus needs to be instantiated after it.
- build_scheduler
- #
- # for delayed or repetitive executions (it's the engine's clock)
- # see http://openwferu.rubyforge.org/scheduler.html
+ build_scheduler
+ #
+ # for delayed or repetitive executions (it's the engine's clock)
+ # see http://openwferu.rubyforge.org/scheduler.html
- build_expression_map
- #
- # mapping expression names ('sequence', 'if', 'concurrence',
- # 'when'...) to their implementations (SequenceExpression,
- # IfExpression, ConcurrenceExpression, ...)
+ build_expression_map
+ #
+ # mapping expression names ('sequence', 'if', 'concurrence',
+ # 'when'...) to their implementations (SequenceExpression,
+ # IfExpression, ConcurrenceExpression, ...)
- build_wfid_generator
- #
- # the workflow instance (process instance) id generator
- # making sure each process instance has a unique identifier
+ build_wfid_generator
+ #
+ # the workflow instance (process instance) id generator
+ # making sure each process instance has a unique identifier
- build_workqueue
- #
- # where apply/reply get queued and processed asynchronously
- # by a single thread
+ build_workqueue
+ #
+ # where apply/reply get queued and processed asynchronously
+ # by a single thread
- build_expression_pool
- #
- # the core (hairy ball) of the engine
+ build_expression_pool
+ #
+ # the core (hairy ball) of the engine
- build_expression_storage
- #
- # the engine persistence (persisting the expression instances
- # that make up process instances)
+ build_expression_storage
+ #
+ # the engine persistence (persisting the expression instances
+ # that make up process instances)
- build_participant_map
- #
- # building the services that maps participant names to
- # participant implementations / instances.
+ build_participant_map
+ #
+ # building the services that maps participant names to
+ # participant implementations / instances.
- build_error_journal
- #
- # builds the error journal (keeping track of failures
- # in business process executions, and an opportunity to
- # fix and replay)
+ build_error_journal
+ #
+ # builds the error journal (keeping track of failures
+ # in business process executions, and an opportunity to
+ # fix and replay)
- linfo { "new() --- engine started --- #{self.object_id}" }
- end
-
+ build_tree_checker
#
- # Call this method once the participants for a persisted engine
- # have been [re]added.
- #
- # If this method is called too soon, missing participants will
- # cause trouble... Call this method after all the participants
- # have been added.
- #
- def reschedule
+ # builds the tree checker (the thing that checks incoming external
+ # ruby code for evil things)
- get_expression_pool.reschedule()
- end
-
- alias :reload :reschedule
-
+ build_def_parser
#
- # When 'parameters' are used at the top of a process definition, this
- # method can be used to assert a launchitem before launch.
- # An expression will be raised if the parameters do not match the
- # requirements.
- #
- # Note that the launch method will raise those exceptions as well.
- # This method can be useful in some scenarii though.
- #
- def pre_launch_check (launchitem)
+ # builds the definition parser (the thing that turns process definitions
+ # into actual expression trees, ready for execution).
- get_expression_pool.prepare_raw_expression(launchitem)
- end
+ linfo { "new() --- engine started --- #{self.object_id}" }
+ end
- #
- # Launches a [business] process.
- # The 'launch_object' param may contain either a LaunchItem instance,
- # either a String containing the URL of the process definition
- # to launch (with an empty LaunchItem created on the fly).
- #
- # The launch object can also be a String containing the XML process
- # definition or directly a class extending OpenWFE::ProcessDefinition
- # (Ruby process definition).
- #
- # Returns the FlowExpressionId instance of the expression at the
- # root of the newly launched process.
- #
- # Options for scheduled launches like :at, :in and :cron are accepted
- # via the 'options' optional parameter.
- # For example :
- #
- # engine.launch(launch_item)
- # # will launch immediately
- #
- # engine.launch(launch_item, :in => "1d20m")
- # # will launch in one day and twenty minutes
- #
- # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
- # # will launch at that point in time
- #
- # engine.launch(launch_item, :cron => "0 5 * * *")
- # # will launch that same process every day,
- # # five minutes after midnight (see "man 5 crontab")
- #
- def launch (launch_object, options={})
+ #
+ # Call this method once the participants for a persisted engine
+ # have been [re]added.
+ #
+ # If this method is called too soon, missing participants will
+ # cause trouble... Call this method after all the participants
+ # have been added.
+ #
+ def reschedule
- launchitem = extract_launchitem launch_object
+ get_expression_pool.reschedule()
+ end
- fei = get_expression_pool.launch launchitem, options
+ alias :reload :reschedule
- #linfo { "launch() #{fei.wfid} : #{fei.wfname} #{fei.wfrevision}" }
+ #
+ # When 'parameters' are used at the top of a process definition, this
+ # method can be used to assert a launchitem before launch.
+ # An expression will be raised if the parameters do not match the
+ # requirements.
+ #
+ # Note that the launch method will raise those exceptions as well.
+ # This method can be useful in some scenarii though.
+ #
+ def pre_launch_check (launchitem)
- fei.dup
- #
- # so that users of this launch() method can play with their
- # fei without breaking things
- end
+ get_expression_pool.prepare_raw_expression(launchitem)
+ end
- #
- # This method is used to feed a workitem back to the engine (after
- # it got sent to a worklist or wherever by a participant).
- # Participant implementations themselves do call this method usually.
- #
- # This method also accepts LaunchItem instances.
- #
- # Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem
- # that don't belong to a process instance (ie whose flow_expression_id
- # is nil). It will simply notify the participant_map of the reply
- # for the given participant_name. If there is no participant_name
- # specified for this orphan workitem, an exception will be raised.
- #
- def reply (workitem)
+ #
+ # Launches a [business] process.
+ # The 'launch_object' param may contain either a LaunchItem instance,
+ # either a String containing the URL of the process definition
+ # to launch (with an empty LaunchItem created on the fly).
+ #
+ # The launch object can also be a String containing the XML process
+ # definition or directly a class extending OpenWFE::ProcessDefinition
+ # (Ruby process definition).
+ #
+ # Returns the FlowExpressionId instance of the expression at the
+ # root of the newly launched process.
+ #
+ # Options for scheduled launches like :at, :in and :cron are accepted
+ # via the 'options' optional parameter.
+ # For example :
+ #
+ # engine.launch(launch_item)
+ # # will launch immediately
+ #
+ # engine.launch(launch_item, :in => "1d20m")
+ # # will launch in one day and twenty minutes
+ #
+ # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
+ # # will launch at that point in time
+ #
+ # engine.launch(launch_item, :cron => "0 5 * * *")
+ # # will launch that same process every day,
+ # # five minutes after midnight (see "man 5 crontab")
+ #
+ # === :wait_for
+ #
+ # If you really need that, you can launch a process and wait for its
+ # termination (or cancellation or error) as in :
+ #
+ # engine.launch(launch_item, :wait_for => true)
+ # # will launch and return only when the process is over
+ #
+ # Note that if you set the option :wait_for to true, a triplet will
+ # be returned instead of just a FlowExpressionId.
+ #
+ # This triplet is composed of [ message, info, fei ]
+ # where message is :terminate, :error or :cancel and info contains
+ # either the workitem, the error or a wfid, respectively.
+ #
+ # See http://groups.google.com/group/openwferu-users/browse_frm/thread/ffd0589bdc877765 for more about this triplet.
+ #
+ # (Note that the current implementation of this :wait_for will return if
+ # any error was found. Thus, if an error occurs in a concurrent branch
+ # and the other branch goes on, the launch() will return, even if the
+ # rest of the process is continuing).
+ #
+ def launch (launch_object, options={})
- if workitem.is_a?(InFlowWorkItem)
+ launchitem = extract_launchitem launch_object
- if workitem.flow_expression_id
- #
- # vanilla case, workitem coming back
- # (from listener probably)
+ fei = get_expression_pool.launch launchitem, options
- return get_expression_pool.reply(
- workitem.flow_expression_id, workitem)
- end
+ #linfo { "launch() #{fei.wfid} : #{fei.wfname} #{fei.wfrevision}" }
- if workitem.participant_name
- #
- # a workitem that doesn't belong to a process instance
- # but bears a participant name.
- # Notify, there may be something listening on
- # this channel (see the 'listen' expression).
+ fei.dup
+ #
+ # so that users of this launch() method can play with their
+ # fei without breaking things
+ end
- return get_participant_map.onotify(
- workitem.participant_name, :reply, workitem)
- end
+ #
+ # This method is used to feed a workitem back to the engine (after
+ # it got sent to a worklist or wherever by a participant).
+ # Participant implementations themselves do call this method usually.
+ #
+ # This method also accepts LaunchItem instances.
+ #
+ # Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem
+ # that don't belong to a process instance (ie whose flow_expression_id
+ # is nil). It will simply notify the participant_map of the reply
+ # for the given participant_name. If there is no participant_name
+ # specified for this orphan workitem, an exception will be raised.
+ #
+ def reply (workitem)
- raise \
- "InFlowWorkitem doesn't belong to a process instance" +
- " nor to a participant"
- end
+ if workitem.is_a?(InFlowWorkItem)
- return get_expression_pool.launch(workitem) \
- if workitem.is_a?(LaunchItem)
- #
- # launchitem coming from listener
- # let's attempt to launch a new process instance
+ if workitem.flow_expression_id
+ #
+ # vanilla case, workitem coming back
+ # (from listener probably)
- raise \
- "engine.reply() " +
- "cannot handle instances of #{workitem.class}"
+ return get_expression_pool.reply(
+ workitem.flow_expression_id, workitem)
end
- alias :forward :reply
- alias :proceed :reply
+ if workitem.participant_name
+ #
+ # a workitem that doesn't belong to a process instance
+ # but bears a participant name.
+ # Notify, there may be something listening on
+ # this channel (see the 'listen' expression).
- #
- # Adds a workitem listener to this engine.
- #
- # The 'freq' parameters if present might indicate how frequently
- # the resource should be polled for incoming workitems.
- #
- # engine.add_workitem_listener(listener, "3m10s")
- # # every 3 minutes and 10 seconds
- #
- # engine.add_workitem_listener(listener, "0 22 * * 1-5")
- # # every weekday at 10pm
- #
- # TODO : block handling...
- #
- def add_workitem_listener (listener, freq=nil)
+ return get_participant_map.onotify(
+ workitem.participant_name, :reply, workitem)
+ end
- name = nil
+ raise \
+ "InFlowWorkitem doesn't belong to a process instance" +
+ " nor to a participant"
+ end
- if listener.kind_of? Class
+ return get_expression_pool.launch(workitem) \
+ if workitem.is_a?(LaunchItem)
+ #
+ # launchitem coming from listener
+ # let's attempt to launch a new process instance
- listener = init_service nil, listener
+ raise \
+ "engine.reply() " +
+ "cannot handle instances of #{workitem.class}"
+ end
- name = listener.service_name
- else
+ alias :forward :reply
+ alias :proceed :reply
- name = listener.name if listener.respond_to? :name
- name = "#{listener.class}::#{listener.object_id}" unless name
+ #
+ # Adds a workitem listener to this engine.
+ #
+ # The 'freq' parameters if present might indicate how frequently
+ # the resource should be polled for incoming workitems.
+ #
+ # engine.add_workitem_listener(listener, "3m10s")
+ # # every 3 minutes and 10 seconds
+ #
+ # engine.add_workitem_listener(listener, "0 22 * * 1-5")
+ # # every weekday at 10pm
+ #
+ # TODO : block handling...
+ #
+ def add_workitem_listener (listener, freq=nil)
- @application_context[name] = listener
- end
+ name = nil
- result = nil
+ if listener.kind_of?(Class)
- if freq
+ listener = init_service nil, listener
- freq = freq.to_s.strip
+ name = listener.service_name
+ else
- result = if Rufus::Scheduler.is_cron_string(freq)
+ name = listener.name if listener.respond_to?(:name)
+ name = "#{listener.class}::#{listener.object_id}" unless name
- get_scheduler.schedule(freq, listener)
- else
+ @application_context[name] = listener
+ end
- get_scheduler.schedule_every(freq, listener)
- end
- end
+ result = nil
- linfo { "add_workitem_listener() added '#{name}'" }
+ if freq
- result
- end
+ freq = freq.to_s.strip
- #
- # Makes the current thread join the engine's scheduler thread
- #
- # You can thus make an engine standalone with something like :
- #
- # require 'openwfe/engine/engine'
- #
- # the_engine = OpenWFE::Engine.new
- # the_engine.join
- #
- # And you'll have to hit CTRL-C to make it stop.
- #
- def join
+ result = if Rufus::Scheduler.is_cron_string(freq)
- get_scheduler.join
- end
+ get_scheduler.schedule(freq, listener)
+ else
- #
- # Calling this method makes the control flow block until the
- # workflow engine is inactive.
- #
- # TODO : implement idle_for
- #
- def join_until_idle
-
- storage = get_expression_storage
-
- while storage.size > 1
- sleep 1
- end
+ get_scheduler.schedule_every(freq, listener)
end
+ end
- #
- # Enabling the console means that hitting CTRL-C on the window /
- # term / dos box / whatever does run the OpenWFEru engine will
- # open an IRB interactive console for directly manipulating the
- # engine instance.
- #
- # Hit CTRL-D to get out of the console.
- #
- def enable_irb_console
+ linfo { "add_workitem_listener() added '#{name}'" }
- OpenWFE::trap_int_irb(binding)
- end
+ result
+ end
- #--
- # Makes sure that hitting CTRL-C will actually kill the engine VM and
- # not open an IRB console.
- #
- #def disable_irb_console
- # $openwfe_irb = nil
- # trap 'INT' do
- # exit 0
- # end
- #end
- #++
+ #
+ # Makes the current thread join the engine's scheduler thread
+ #
+ # You can thus make an engine standalone with something like :
+ #
+ # require 'openwfe/engine/engine'
+ #
+ # the_engine = OpenWFE::Engine.new
+ # the_engine.join
+ #
+ # And you'll have to hit CTRL-C to make it stop.
+ #
+ def join
- #
- # Stopping the engine will stop all the services in the
- # application context.
- #
- def stop
+ get_scheduler.join
+ end
- linfo { "stop() stopping engine '#{@service_name}'" }
+ #
+ # Calling this method makes the control flow block until the
+ # workflow engine is inactive.
+ #
+ # TODO : implement idle_for
+ #
+ def join_until_idle
- @application_context.each do |sname, service|
+ storage = get_expression_storage
- next if sname == self.service_name
+ while storage.size > 1
+ sleep 1
+ end
+ end
- #if service.kind_of?(ServiceMixin)
- if service.respond_to?(:stop)
+ #
+ # Enabling the console means that hitting CTRL-C on the window /
+ # term / dos box / whatever does run the OpenWFEru engine will
+ # open an IRB interactive console for directly manipulating the
+ # engine instance.
+ #
+ # Hit CTRL-D to get out of the console.
+ #
+ def enable_irb_console
- service.stop
+ OpenWFE::trap_int_irb(binding)
+ end
- linfo do
- "stop() stopped service '#{sname}' (#{service.class})"
- end
- end
- end
+ #--
+ # Makes sure that hitting CTRL-C will actually kill the engine VM and
+ # not open an IRB console.
+ #
+ #def disable_irb_console
+ # $openwfe_irb = nil
+ # trap 'INT' do
+ # exit 0
+ # end
+ #end
+ #++
- linfo { "stop() stopped engine '#{@service_name}'" }
+ #
+ # Stopping the engine will stop all the services in the
+ # application context.
+ #
+ def stop
- nil
- end
+ linfo { "stop() stopping engine '#{@service_name}'" }
- #
- # Waits for a given process instance to terminate.
- # The method only exits when the flow terminates, but beware : if
- # the process already terminated, the method will never exit.
- #
- # The parameter can be a FlowExpressionId instance, for example the
- # one given back by a launch(), or directly a workflow instance id
- # (String).
- #
- # This method is mainly used in utests.
- #
- def wait_for (fei_or_wfid)
+ @application_context.each do |sname, service|
- wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
- fei_or_wfid.workflow_instance_id
- else
- fei_or_wfid
- end
+ next if sname == self.service_name
- t = Thread.new { Thread.stop }
+ #if service.kind_of?(ServiceMixin)
+ if service.respond_to?(:stop)
- to = get_expression_pool.add_observer(:terminate) do |c, fe, wi|
- t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?)
- end
- te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e|
- t.wakeup if (fei.parent_wfid == wfid and t.alive?)
- end
+ service.stop
- t.join
-
- #tc = get_expression_pool.add_observer(:cancel) do |c, fe|
- # if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?)
- # sleep 0.500
- # t.wakeup
- # end
- #end
-
- linfo { "wait_for() #{wfid}" }
-
- get_expression_pool.remove_observer to, :terminate
- get_expression_pool.remove_observer te, :error
- #get_expression_pool.remove_observer tc, :cancel
- #
- # it would work as well without specifying the channel,
- # but it's thus a little bit faster
+ linfo do
+ "stop() stopped service '#{sname}' (#{service.class})"
+ end
end
+ end
- #
- # Pauses a process (sets its /__paused__ variable to true).
- #
- def pause_process (wfid)
+ linfo { "stop() stopped engine '#{@service_name}'" }
- wfid = extract_wfid wfid
+ nil
+ end
- root_expression = get_expression_pool.fetch_root wfid
+ #
+ # Waits for a given process instance to terminate.
+ # The method only exits when the flow terminates, but beware : if
+ # the process already terminated, the method will never exit.
+ #
+ # The parameter can be a FlowExpressionId instance, for example the
+ # one given back by a launch(), or directly a workflow instance id
+ # (String).
+ #
+ # This method is mainly used in utests.
+ #
+ def wait_for (fei_or_wfid)
- get_expression_pool.paused_instances[wfid] = true
- root_expression.set_variable VAR_PAUSED, true
- end
+ wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
+ fei_or_wfid.workflow_instance_id
+ else
+ fei_or_wfid
+ end
- #
- # Restarts a process : removes its 'paused' flag (variable) and makes
- # sure to 'replay' events (replies) that came for it while it was
- # in pause.
- #
- def resume_process (wfid)
+ get_expression_pool.send :wait_for, wfid
+ end
- wfid = extract_wfid wfid
+ #
+ # Looks up a process variable in a process.
+ # If fei_or_wfid is not given, will simply look in the
+ # 'engine environment' (where the top level variables '//' do reside).
+ #
+ def lookup_variable (var_name, fei_or_wfid=nil)
- root_expression = get_expression_pool.fetch_root wfid
+ return get_expression_pool.fetch_engine_environment[var_name] \
+ unless fei_or_wfid
- #
- # remove 'paused' flag
+ fetch_exp(fei_or_wfid).lookup_variable var_name
+ end
- get_expression_pool.paused_instances.delete wfid
- root_expression.unset_variable VAR_PAUSED
+ #
+ # Returns the variables set for a process or an expression.
+ #
+ # If a process (wfid) is given, variables of the process environment
+ # will be returned, else variables in the environment valid for the
+ # expression (fei) will be returned.
+ #
+ # If nothing (or nil) is given, the variables set in the engine
+ # environment will be returned.
+ #
+ def get_variables (fei_or_wfid=nil)
- #
- # replay
- #
- # select PausedError instances in separate list
+ return get_expression_pool.fetch_engine_environment.variables \
+ unless fei_or_wfid
- errors = get_error_journal.get_error_log wfid
- error_class = PausedError.name
- paused_errors = errors.select { |e| e.error_class == error_class }
+ fetch_exp(fei_or_wfid).get_environment.variables
+ end
- return if paused_errors.size < 1
+ #
+ # Returns an array of wfid (workflow instance ids) whose root
+ # environment contains the given variable
+ #
+ # If there are no matches, an empty array will be returned.
+ #
+ # Regular expressions are accepted as values.
+ #
+ # If no value is given, all processes with the given variable name
+ # set will be returned.
+ #
+ def lookup_processes (var_name, value=nil)
- # replay select PausedError instances
+ # TODO : maybe this would be better in the ExpressionPool
- paused_errors.each do |e|
- replay_at_error e
- end
- end
+ regexp = value.is_a?(Regexp) ? value : nil
- #
- # Takes care of removing an error from the error journal and
- # they replays its process at that point.
- #
- def replay_at_error (error)
+ envs = get_expression_storage.find_expressions(
+ :include_classes => Environment)
- get_error_journal.remove_errors(
- error.fei.parent_wfid,
- error)
+ envs = envs.find_all do |env|
- get_workqueue.push(
- get_expression_pool,
- :do_apply_reply,
- error.message,
- error.fei,
- error.workitem)
- end
+ val = env.variables[var_name]
- #
- # Looks up a process variable in a process.
- # If fei_or_wfid is not given, will simply look in the
- # 'engine environment' (where the top level variables '//' do reside).
- #
- def lookup_variable (var_name, fei_or_wfid=nil)
-
- return get_expression_pool.fetch_engine_environment[var_name] \
- unless fei_or_wfid
-
- fetch_exp(fei_or_wfid).lookup_variable var_name
+ #(val and ((not regexp) or (regexp.match(val))))
+ if val != nil
+ if regexp
+ regexp.match(val)
+ elsif value
+ val == value
+ else
+ true
+ end
+ else
+ false
end
+ end
- #
- # Returns the variables set for a process or an expression.
- #
- # If a process (wfid) is given, variables of the process environment
- # will be returned, else variables in the environment valid for the
- # expression (fei) will be returned.
- #
- # If nothing (or nil) is given, the variables set in the engine
- # environment will be returned.
- #
- def get_variables (fei_or_wfid=nil)
+ envs.collect { |env| env.fei.wfid }
- return get_expression_pool.fetch_engine_environment.variables \
- unless fei_or_wfid
-
- fetch_exp(fei_or_wfid).get_environment.variables
- end
-
+ #envs.inject([]) do |r, env|
+ # val = env.variables[var_name]
+ # r << env.fei.wfid \
+ # if (val and ((not regexp) or (regexp.match(val))))
+ # r
+ #end
#
- # Returns an array of wfid (workflow instance ids) whose root
- # environment containes the given variable
- #
- # If there are no matches, an empty array will be returned.
- #
- # Regular expressions are accepted as values.
- #
- # If no value is given, all processes with the given variable name
- # set will be returned.
- #
- def lookup_processes (var_name, value=nil)
+ # seems slower...
+ end
- # TODO : maybe this would be better in the ExpressionPool
+ protected
- regexp = if value
- if value.is_a?(Regexp)
- value
- else
- Regexp.compile(value.to_s)
- end
- else
- nil
- end
+ #--
+ # the following methods may get overridden upon extension
+ # see for example file_persisted_engine.rb
+ #++
- envs = get_expression_storage.find_expressions(
- :include_classes => Environment)
+ #
+ # Builds the ExpressionMap (the mapping between expression names
+ # and expression implementations).
+ #
+ def build_expression_map
- envs = envs.find_all do |env|
- val = env.variables[var_name]
- (val and ((not regexp) or (regexp.match(val))))
- end
- envs.collect do |env|
- env.fei.wfid
- end
+ @application_context[:s_expression_map] = ExpressionMap.new
+ #
+ # the expression map is not a Service anymore,
+ # it's a simple instance (that will be reused in other
+ # OpenWFEru components)
+ end
- #envs.inject([]) do |r, env|
- # val = env.variables[var_name]
- # r << env.fei.wfid \
- # if (val and ((not regexp) or (regexp.match(val))))
- # r
- #end
- #
- # seems slower...
- end
+ #
+ # This implementation builds a KotobaWfidGenerator instance and
+ # binds it in the engine context.
+ # There are other WfidGeneration implementations available, like
+ # UuidWfidGenerator or FieldWfidGenerator.
+ #
+ def build_wfid_generator
- protected
+ #init_service :s_wfid_generator, DefaultWfidGenerator
+ #init_service :s_wfid_generator, UuidWfidGenerator
+ init_service :s_wfid_generator, KotobaWfidGenerator
- #--
- # the following methods may get overridden upon extension
- # see for example file_persisted_engine.rb
- #++
+ #g = FieldWfidGenerator.new(
+ # :s_wfid_generator, @application_context, "wfid")
+ #
+ # showing how to initialize a FieldWfidGenerator that
+ # will take as workflow instance id the value found in
+ # the field "wfid" of the LaunchItem.
+ end
- #
- # Builds the ExpressionMap (the mapping between expression names
- # and expression implementations).
- #
- def build_expression_map
+ #
+ # Builds the workqueue where apply/reply work is queued
+ # and processed.
+ #
+ def build_workqueue
- @application_context[S_EXPRESSION_MAP] = ExpressionMap.new
- #
- # the expression map is not a Service anymore,
- # it's a simple instance (that will be reused in other
- # OpenWFEru components)
- end
+ init_service :s_workqueue, WorkQueue
+ end
- #
- # This implementation builds a KotobaWfidGenerator instance and
- # binds it in the engine context.
- # There are other WfidGeneration implementations available, like
- # UuidWfidGenerator or FieldWfidGenerator.
- #
- def build_wfid_generator
+ #
+ # Builds the OpenWFEru expression pool (the core of the engine)
+ # and binds it in the engine context.
+ # There is only one implementation of the expression pool, so
+ # this method is usually never overriden.
+ #
+ def build_expression_pool
- #init_service S_WFID_GENERATOR, DefaultWfidGenerator
- #init_service S_WFID_GENERATOR, UuidWfidGenerator
- init_service S_WFID_GENERATOR, KotobaWfidGenerator
+ init_service :s_expression_pool, ExpressionPool
+ end
- #g = FieldWfidGenerator.new(
- # S_WFID_GENERATOR, @application_context, "wfid")
- #
- # showing how to initialize a FieldWfidGenerator that
- # will take as workflow instance id the value found in
- # the field "wfid" of the LaunchItem.
- end
+ #
+ # The implementation here builds an InMemoryExpressionStorage
+ # instance.
+ #
+ # See FilePersistedEngine or CachedFilePersistedEngine for
+ # overrides of this method.
+ #
+ def build_expression_storage
- #
- # Builds the workqueue where apply/reply work is queued
- # and processed.
- #
- def build_workqueue
+ init_service :s_expression_storage, InMemoryExpressionStorage
+ end
- init_service S_WORKQUEUE, WorkQueue
- end
+ #
+ # The ParticipantMap is a mapping between participant names
+ # (well rather regular expressions) and participant implementations
+ # (see http://openwferu.rubyforge.org/participants.html)
+ #
+ def build_participant_map
- #
- # Builds the OpenWFEru expression pool (the core of the engine)
- # and binds it in the engine context.
- # There is only one implementation of the expression pool, so
- # this method is usually never overriden.
- #
- def build_expression_pool
+ init_service :s_participant_map, ParticipantMap
+ end
- init_service S_EXPRESSION_POOL, ExpressionPool
- end
+ #
+ # There is only one Scheduler implementation, that's the one
+ # built and bound here.
+ #
+ def build_scheduler
- #
- # The implementation here builds an InMemoryExpressionStorage
- # instance.
- #
- # See FilePersistedEngine or CachedFilePersistedEngine for
- # overrides of this method.
- #
- def build_expression_storage
+ @application_context[:s_scheduler] = Rufus::Scheduler.start_new(
+ :thread_name =>
+ "rufus scheduler for Ruote (engine #{self.object_id})")
- init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage
- end
+ @application_context[:s_scheduler].extend Logging
- #
- # The ParticipantMap is a mapping between participant names
- # (well rather regular expressions) and participant implementations
- # (see http://openwferu.rubyforge.org/participants.html)
- #
- def build_participant_map
+ linfo { "build_scheduler() version is #{Rufus::Scheduler::VERSION}" }
+ end
- init_service S_PARTICIPANT_MAP, ParticipantMap
- end
-
- #
- # There is only one Scheduler implementation, that's the one
- # built and bound here.
- #
- def build_scheduler
+ #
+ # The default implementation of this method uses an
+ # InMemoryErrorJournal (do not use in production).
+ #
+ def build_error_journal
- s = Rufus::Scheduler.new(
- :thread_name =>
- "rufus scheduler for Ruote (engine #{self.object_id})")
+ init_service :s_error_journal, InMemoryErrorJournal
+ end
- @application_context[S_SCHEDULER] = s
+ #
+ # builds the tree checker (see lib/openwfe/util/treechecker.rb)
+ #
+ def build_tree_checker
- s.start
- end
+ init_service :s_tree_checker, OpenWFE::TreeChecker
+ end
- #
- # The default implementation of this method uses an
- # InMemoryErrorJournal (do not use in production).
- #
- def build_error_journal
+ #
+ # builds the service that turn process definitions into runnable
+ # expression trees...
+ #
+ def build_def_parser
- init_service S_ERROR_JOURNAL, InMemoryErrorJournal
- end
+ init_service :s_def_parser, DefParser
+ end
- #
- # Turns the raw launch request info into a LaunchItem instance.
- #
- def extract_launchitem (launch_object)
+ #
+ # Turns the raw launch request info into a LaunchItem instance.
+ #
+ def extract_launchitem (launch_object)
- if launch_object.kind_of?(OpenWFE::LaunchItem)
+ if launch_object.kind_of?(OpenWFE::LaunchItem)
- launch_object
+ launch_object
- elsif launch_object.kind_of?(Class)
+ elsif launch_object.kind_of?(Class)
- LaunchItem.new launch_object
+ LaunchItem.new launch_object
- elsif launch_object.kind_of?(String)
+ elsif launch_object.kind_of?(String)
- li = OpenWFE::LaunchItem.new
+ li = OpenWFE::LaunchItem.new
- if launch_object[0, 1] == '<' or launch_object.index("\n")
+ if launch_object[0, 1] == '<' or launch_object.index("\n")
- li.workflow_definition_url = "field:__definition"
- li['__definition'] = launch_object
+ li.workflow_definition_url = "field:__definition"
+ li['__definition'] = launch_object
- else
+ else
- li.workflow_definition_url = launch_object
- end
+ li.workflow_definition_url = launch_object
+ end
- li
- end
- end
- end
+ li
+ end
+ end
+ end
end