lib/openwfe/engine/engine.rb in ruote-0.9.19 vs lib/openwfe/engine/engine.rb in ruote-0.9.20
- old
+ new
@@ -1,45 +1,29 @@
-#
#--
-# Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org
-# All rights reserved.
+# Copyright (c) 2006-2009, John Mettraux, Nicolas Modrzyk OpenWFE.org
#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
#
-# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
#
-# . Neither the name of the "OpenWFE" nor the names of its contributors may be
-# used to endorse or promote products derived from this software without
-# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
+# Made in Japan.
#++
-#
-#
-# "made in Japan"
-#
-# John Mettraux at openwfe.org
-# Nicolas Modrzyk at openwfe.org
-#
-
require 'logger'
require 'fileutils'
require 'rufus/scheduler' # gem 'rufus-scheduler'
@@ -48,22 +32,25 @@
require 'openwfe/service'
require 'openwfe/workitem'
require 'openwfe/util/irb'
require 'openwfe/util/workqueue'
require 'openwfe/util/treechecker'
-require 'openwfe/expool/parser'
+require 'openwfe/expool/def_parser'
require 'openwfe/expool/wfidgen'
require 'openwfe/expool/expressionpool'
require 'openwfe/expool/expstorage'
require 'openwfe/expool/errorjournal'
+require 'openwfe/engine/launch_methods'
require 'openwfe/engine/expool_methods'
require 'openwfe/engine/status_methods'
+require 'openwfe/engine/lookup_methods'
+require 'openwfe/engine/listener_methods'
require 'openwfe/engine/participant_methods'
require 'openwfe/engine/update_exp_methods'
require 'openwfe/expressions/environment'
-require 'openwfe/expressions/expressionmap'
-require 'openwfe/participants/participantmap'
+require 'openwfe/expressions/expression_map'
+require 'openwfe/participants/participant_map'
module OpenWFE
#
@@ -75,12 +62,15 @@
include OwfeServiceLocator
include FeiMixin
include ExpoolMethods
include StatusMethods
+ include LookupMethods
+ include ListenerMethods
include ParticipantMethods
include UpdateExpMethods
+ include LaunchMethods
#
# The name of the engine, will be used to 'stamp' each expression
# active in the engine (and thus indirectrly, each workitem)
@@ -93,24 +83,23 @@
# Accepts an optional initial application_context (containing
# initialization params for services for example).
#
# The engine itself uses one param :logger, used to define
# where all the log output for OpenWFEru should go.
- # By default, this output goes to logs/openwferu.log
+ # By default, this output goes to logs/ruote.log
#
def initialize (application_context={})
- super :s_engine, application_context
+ super(:s_engine, application_context)
- @engine_name = application_context[:engine_name] || 'engine'
+ @engine_name = (application_context[:engine_name] || 'engine').to_s
$OWFE_LOG = application_context[:logger]
unless $OWFE_LOG
- #puts "Creating logs in " + FileUtils.pwd
- FileUtils.mkdir("logs") unless File.exist?("logs")
- $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000
+ FileUtils.mkdir('logs') unless File.exist?('logs')
+ $OWFE_LOG = Logger.new('logs/ruote.log', 10, 1024000)
$OWFE_LOG.level = Logger::INFO
end
# build order matters.
#
@@ -185,90 +174,10 @@
end
alias :reload :reschedule
#
- # When 'parameters' are used at the top of a process definition, this
- # method can be used to assert a launchitem before launch.
- # An expression will be raised if the parameters do not match the
- # requirements.
- #
- # Note that the launch method will raise those exceptions as well.
- # This method can be useful in some scenarii though.
- #
- def pre_launch_check (launchitem)
-
- get_expression_pool.prepare_raw_expression(launchitem)
- end
-
- #
- # Launches a [business] process.
- # The 'launch_object' param may contain either a LaunchItem instance,
- # either a String containing the URL of the process definition
- # to launch (with an empty LaunchItem created on the fly).
- #
- # The launch object can also be a String containing the XML process
- # definition or directly a class extending OpenWFE::ProcessDefinition
- # (Ruby process definition).
- #
- # Returns the FlowExpressionId instance of the expression at the
- # root of the newly launched process.
- #
- # Options for scheduled launches like :at, :in and :cron are accepted
- # via the 'options' optional parameter.
- # For example :
- #
- # engine.launch(launch_item)
- # # will launch immediately
- #
- # engine.launch(launch_item, :in => "1d20m")
- # # will launch in one day and twenty minutes
- #
- # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007")
- # # will launch at that point in time
- #
- # engine.launch(launch_item, :cron => "0 5 * * *")
- # # will launch that same process every day,
- # # five minutes after midnight (see "man 5 crontab")
- #
- # === :wait_for
- #
- # If you really need that, you can launch a process and wait for its
- # termination (or cancellation or error) as in :
- #
- # engine.launch(launch_item, :wait_for => true)
- # # will launch and return only when the process is over
- #
- # Note that if you set the option :wait_for to true, a triplet will
- # be returned instead of just a FlowExpressionId.
- #
- # This triplet is composed of [ message, info, fei ]
- # where message is :terminate, :error or :cancel and info contains
- # either the workitem, the error or a wfid, respectively.
- #
- # See http://groups.google.com/group/openwferu-users/browse_frm/thread/ffd0589bdc877765 for more about this triplet.
- #
- # (Note that the current implementation of this :wait_for will return if
- # any error was found. Thus, if an error occurs in a concurrent branch
- # and the other branch goes on, the launch() will return, even if the
- # rest of the process is continuing).
- #
- def launch (launch_object, options={})
-
- launchitem = extract_launchitem launch_object
-
- fei = get_expression_pool.launch launchitem, options
-
- #linfo { "launch() #{fei.wfid} : #{fei.wfname} #{fei.wfrevision}" }
-
- fei.dup
- #
- # so that users of this launch() method can play with their
- # fei without breaking things
- end
-
- #
# This method is used to feed a workitem back to the engine (after
# it got sent to a worklist or wherever by a participant).
# Participant implementations themselves do call this method usually.
#
# This method also accepts LaunchItem instances.
@@ -301,13 +210,13 @@
return get_participant_map.onotify(
workitem.participant_name, :reply, workitem)
end
- raise \
+ raise(
"InFlowWorkitem doesn't belong to a process instance" +
- " nor to a participant"
+ " nor to a participant")
end
return get_expression_pool.launch(workitem) \
if workitem.is_a?(LaunchItem)
#
@@ -321,61 +230,10 @@
alias :forward :reply
alias :proceed :reply
#
- # Adds a workitem listener to this engine.
- #
- # The 'freq' parameters if present might indicate how frequently
- # the resource should be polled for incoming workitems.
- #
- # engine.add_workitem_listener(listener, "3m10s")
- # # every 3 minutes and 10 seconds
- #
- # engine.add_workitem_listener(listener, "0 22 * * 1-5")
- # # every weekday at 10pm
- #
- # TODO : block handling...
- #
- def add_workitem_listener (listener, freq=nil)
-
- name = nil
-
- if listener.kind_of?(Class)
-
- listener = init_service nil, listener
-
- name = listener.service_name
- else
-
- name = listener.name if listener.respond_to?(:name)
- name = "#{listener.class}::#{listener.object_id}" unless name
-
- @application_context[name] = listener
- end
-
- result = nil
-
- if freq
-
- freq = freq.to_s.strip
-
- result = if Rufus::Scheduler.is_cron_string(freq)
-
- get_scheduler.schedule(freq, listener)
- else
-
- get_scheduler.schedule_every(freq, listener)
- end
- end
-
- linfo { "add_workitem_listener() added '#{name}'" }
-
- result
- end
-
- #
# Makes the current thread join the engine's scheduler thread
#
# You can thus make an engine standalone with something like :
#
# require 'openwfe/engine/engine'
@@ -440,281 +298,158 @@
@application_context.each do |sname, service|
next if sname == self.service_name
- #if service.kind_of?(ServiceMixin)
if service.respond_to?(:stop)
service.stop
- linfo do
- "stop() stopped service '#{sname}' (#{service.class})"
- end
+ linfo { "stop() stopped service '#{sname}' (#{service.class})" }
end
end
linfo { "stop() stopped engine '#{@service_name}'" }
nil
end
+ protected
+
+ #--
+ # the following methods may get overridden upon extension
+ # see for example file_persisted_engine.rb
+ #++
+
#
- # Waits for a given process instance to terminate.
- # The method only exits when the flow terminates, but beware : if
- # the process already terminated, the method will never exit.
+ # Builds the ExpressionMap (the mapping between expression names
+ # and expression implementations).
#
- # The parameter can be a FlowExpressionId instance, for example the
- # one given back by a launch(), or directly a workflow instance id
- # (String).
+ def build_expression_map
+
+ @application_context[:s_expression_map] = ExpressionMap.new
+ #
+ # the expression map is not a Service anymore,
+ # it's a simple instance (that will be reused in other
+ # OpenWFEru components)
+ end
+
#
- # This method is mainly used in utests.
+ # This implementation builds a KotobaWfidGenerator instance and
+ # binds it in the engine context.
+ # There are other WfidGeneration implementations available, like
+ # UuidWfidGenerator or FieldWfidGenerator.
#
- def wait_for (fei_or_wfid)
+ def build_wfid_generator
- wfid = if fei_or_wfid.kind_of?(FlowExpressionId)
- fei_or_wfid.workflow_instance_id
- else
- fei_or_wfid
- end
+ #init_service(:s_wfid_generator, DefaultWfidGenerator)
+ #init_service(:s_wfid_generator, UuidWfidGenerator)
+ init_service(:s_wfid_generator, KotobaWfidGenerator)
- get_expression_pool.send :wait_for, wfid
+ #g = FieldWfidGenerator.new(
+ # :s_wfid_generator, @application_context, "wfid")
+ #
+ # showing how to initialize a FieldWfidGenerator that
+ # will take as workflow instance id the value found in
+ # the field "wfid" of the LaunchItem.
end
#
- # Looks up a process variable in a process.
- # If fei_or_wfid is not given, will simply look in the
- # 'engine environment' (where the top level variables '//' do reside).
+ # Builds the workqueue where apply/reply work is queued
+ # and processed.
#
- def lookup_variable (var_name, fei_or_wfid=nil)
+ def build_workqueue
- return get_expression_pool.fetch_engine_environment[var_name] \
- unless fei_or_wfid
-
- fetch_exp(fei_or_wfid).lookup_variable var_name
+ init_service(:s_workqueue, WorkQueue)
end
#
- # Returns the variables set for a process or an expression.
+ # Builds the OpenWFEru expression pool (the core of the engine)
+ # and binds it in the engine context.
+ # There is only one implementation of the expression pool, so
+ # this method is usually never overriden.
#
- # If a process (wfid) is given, variables of the process environment
- # will be returned, else variables in the environment valid for the
- # expression (fei) will be returned.
- #
- # If nothing (or nil) is given, the variables set in the engine
- # environment will be returned.
- #
- def get_variables (fei_or_wfid=nil)
+ def build_expression_pool
- return get_expression_pool.fetch_engine_environment.variables \
- unless fei_or_wfid
-
- fetch_exp(fei_or_wfid).get_environment.variables
+ init_service(:s_expression_pool, ExpressionPool)
end
#
- # Returns an array of wfid (workflow instance ids) whose root
- # environment contains the given variable
+ # The implementation here builds an InMemoryExpressionStorage
+ # instance.
#
- # If there are no matches, an empty array will be returned.
+ # See FilePersistedEngine or CachedFilePersistedEngine for
+ # overrides of this method.
#
- # Regular expressions are accepted as values.
+ def build_expression_storage
+
+ init_service(:s_expression_storage, InMemoryExpressionStorage)
+ end
+
#
- # If no value is given, all processes with the given variable name
- # set will be returned.
+ # The ParticipantMap is a mapping between participant names
+ # (well rather regular expressions) and participant implementations
+ # (see http://openwferu.rubyforge.org/participants.html)
#
- def lookup_processes (var_name, value=nil)
+ def build_participant_map
- # TODO : maybe this would be better in the ExpressionPool
+ init_service(:s_participant_map, ParticipantMap)
+ end
- regexp = value.is_a?(Regexp) ? value : nil
+ #
+ # There is only one Scheduler implementation, that's the one
+ # built and bound here.
+ #
+ def build_scheduler
- envs = get_expression_storage.find_expressions(
- :include_classes => Environment)
+ @application_context[:s_scheduler] = Rufus::Scheduler.start_new(
+ :thread_name =>
+ "rufus scheduler for Ruote (engine #{self.object_id})")
- envs = envs.find_all do |env|
+ @application_context[:s_scheduler].extend(Logging)
- val = env.variables[var_name]
+ linfo { "build_scheduler() version is #{Rufus::Scheduler::VERSION}" }
+ end
- #(val and ((not regexp) or (regexp.match(val))))
- if val != nil
- if regexp
- regexp.match(val)
- elsif value
- val == value
- else
- true
- end
- else
- false
- end
- end
+ #
+ # The default implementation of this method uses an
+ # InMemoryErrorJournal (do not use in production).
+ #
+ def build_error_journal
- envs.collect { |env| env.fei.wfid }
-
- #envs.inject([]) do |r, env|
- # val = env.variables[var_name]
- # r << env.fei.wfid \
- # if (val and ((not regexp) or (regexp.match(val))))
- # r
- #end
- #
- # seems slower...
+ init_service(:s_error_journal, InMemoryErrorJournal)
end
- protected
+ #
+ # builds the tree checker (see lib/openwfe/util/treechecker.rb)
+ #
+ def build_tree_checker
- #--
- # the following methods may get overridden upon extension
- # see for example file_persisted_engine.rb
- #++
+ init_service(:s_tree_checker, OpenWFE::TreeChecker)
+ end
- #
- # Builds the ExpressionMap (the mapping between expression names
- # and expression implementations).
- #
- def build_expression_map
+ #
+ # builds the service that turn process definitions into runnable
+ # expression trees...
+ #
+ def build_def_parser
- @application_context[:s_expression_map] = ExpressionMap.new
- #
- # the expression map is not a Service anymore,
- # it's a simple instance (that will be reused in other
- # OpenWFEru components)
- end
+ init_service(:s_def_parser, DefParser)
+ end
- #
- # This implementation builds a KotobaWfidGenerator instance and
- # binds it in the engine context.
- # There are other WfidGeneration implementations available, like
- # UuidWfidGenerator or FieldWfidGenerator.
- #
- def build_wfid_generator
+ #
+ # Whether the :no_expstorage_cache is set, a CacheExpressionStorage
+ # will be set or not.
+ #
+ def init_storage (storage_class)
- #init_service :s_wfid_generator, DefaultWfidGenerator
- #init_service :s_wfid_generator, UuidWfidGenerator
- init_service :s_wfid_generator, KotobaWfidGenerator
-
- #g = FieldWfidGenerator.new(
- # :s_wfid_generator, @application_context, "wfid")
- #
- # showing how to initialize a FieldWfidGenerator that
- # will take as workflow instance id the value found in
- # the field "wfid" of the LaunchItem.
+ if @application_context[:no_expstorage_cache]
+ init_service(:s_expression_storage, storage_class)
+ else
+ init_service(:s_expression_storage, CacheExpressionStorage)
+ init_service(:s_expression_storage__1, storage_class)
end
-
- #
- # Builds the workqueue where apply/reply work is queued
- # and processed.
- #
- def build_workqueue
-
- init_service :s_workqueue, WorkQueue
- end
-
- #
- # Builds the OpenWFEru expression pool (the core of the engine)
- # and binds it in the engine context.
- # There is only one implementation of the expression pool, so
- # this method is usually never overriden.
- #
- def build_expression_pool
-
- init_service :s_expression_pool, ExpressionPool
- end
-
- #
- # The implementation here builds an InMemoryExpressionStorage
- # instance.
- #
- # See FilePersistedEngine or CachedFilePersistedEngine for
- # overrides of this method.
- #
- def build_expression_storage
-
- init_service :s_expression_storage, InMemoryExpressionStorage
- end
-
- #
- # The ParticipantMap is a mapping between participant names
- # (well rather regular expressions) and participant implementations
- # (see http://openwferu.rubyforge.org/participants.html)
- #
- def build_participant_map
-
- init_service :s_participant_map, ParticipantMap
- end
-
- #
- # There is only one Scheduler implementation, that's the one
- # built and bound here.
- #
- def build_scheduler
-
- @application_context[:s_scheduler] = Rufus::Scheduler.start_new(
- :thread_name =>
- "rufus scheduler for Ruote (engine #{self.object_id})")
-
- @application_context[:s_scheduler].extend Logging
-
- linfo { "build_scheduler() version is #{Rufus::Scheduler::VERSION}" }
- end
-
- #
- # The default implementation of this method uses an
- # InMemoryErrorJournal (do not use in production).
- #
- def build_error_journal
-
- init_service :s_error_journal, InMemoryErrorJournal
- end
-
- #
- # builds the tree checker (see lib/openwfe/util/treechecker.rb)
- #
- def build_tree_checker
-
- init_service :s_tree_checker, OpenWFE::TreeChecker
- end
-
- #
- # builds the service that turn process definitions into runnable
- # expression trees...
- #
- def build_def_parser
-
- init_service :s_def_parser, DefParser
- end
-
- #
- # Turns the raw launch request info into a LaunchItem instance.
- #
- def extract_launchitem (launch_object)
-
- if launch_object.kind_of?(OpenWFE::LaunchItem)
-
- launch_object
-
- elsif launch_object.kind_of?(Class)
-
- LaunchItem.new launch_object
-
- elsif launch_object.kind_of?(String)
-
- li = OpenWFE::LaunchItem.new
-
- if launch_object[0, 1] == '<' or launch_object.index("\n")
-
- li.workflow_definition_url = "field:__definition"
- li['__definition'] = launch_object
-
- else
-
- li.workflow_definition_url = launch_object
- end
-
- li
- end
- end
+ end
end
end