lib/openwfe/engine/engine.rb in ruote-0.9.19 vs lib/openwfe/engine/engine.rb in ruote-0.9.20

- old
+ new

@@ -1,45 +1,29 @@ -# #-- -# Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org -# All rights reserved. +# Copyright (c) 2006-2009, John Mettraux, Nicolas Modrzyk OpenWFE.org # -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: # -# . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. # -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +# THE SOFTWARE. # -# . Neither the name of the "OpenWFE" nor the names of its contributors may be -# used to endorse or promote products derived from this software without -# specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE -# POSSIBILITY OF SUCH DAMAGE. +# Made in Japan. #++ -# -# -# "made in Japan" -# -# John Mettraux at openwfe.org -# Nicolas Modrzyk at openwfe.org -# - require 'logger' require 'fileutils' require 'rufus/scheduler' # gem 'rufus-scheduler' @@ -48,22 +32,25 @@ require 'openwfe/service' require 'openwfe/workitem' require 'openwfe/util/irb' require 'openwfe/util/workqueue' require 'openwfe/util/treechecker' -require 'openwfe/expool/parser' +require 'openwfe/expool/def_parser' require 'openwfe/expool/wfidgen' require 'openwfe/expool/expressionpool' require 'openwfe/expool/expstorage' require 'openwfe/expool/errorjournal' +require 'openwfe/engine/launch_methods' require 'openwfe/engine/expool_methods' require 'openwfe/engine/status_methods' +require 'openwfe/engine/lookup_methods' +require 'openwfe/engine/listener_methods' require 'openwfe/engine/participant_methods' require 'openwfe/engine/update_exp_methods' require 'openwfe/expressions/environment' -require 'openwfe/expressions/expressionmap' -require 'openwfe/participants/participantmap' +require 'openwfe/expressions/expression_map' +require 'openwfe/participants/participant_map' module OpenWFE # @@ -75,12 +62,15 @@ include OwfeServiceLocator include FeiMixin include ExpoolMethods include StatusMethods + include LookupMethods + include ListenerMethods include ParticipantMethods include UpdateExpMethods + include LaunchMethods # # The name of the engine, will be used to 'stamp' each expression # active in the engine (and thus indirectrly, each workitem) @@ -93,24 +83,23 @@ # Accepts an optional initial application_context (containing # initialization params for services for example). # # The engine itself uses one param :logger, used to define # where all the log output for OpenWFEru should go. - # By default, this output goes to logs/openwferu.log + # By default, this output goes to logs/ruote.log # def initialize (application_context={}) - super :s_engine, application_context + super(:s_engine, application_context) - @engine_name = application_context[:engine_name] || 'engine' + @engine_name = (application_context[:engine_name] || 'engine').to_s $OWFE_LOG = application_context[:logger] unless $OWFE_LOG - #puts "Creating logs in " + FileUtils.pwd - FileUtils.mkdir("logs") unless File.exist?("logs") - $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000 + FileUtils.mkdir('logs') unless File.exist?('logs') + $OWFE_LOG = Logger.new('logs/ruote.log', 10, 1024000) $OWFE_LOG.level = Logger::INFO end # build order matters. # @@ -185,90 +174,10 @@ end alias :reload :reschedule # - # When 'parameters' are used at the top of a process definition, this - # method can be used to assert a launchitem before launch. - # An expression will be raised if the parameters do not match the - # requirements. - # - # Note that the launch method will raise those exceptions as well. - # This method can be useful in some scenarii though. - # - def pre_launch_check (launchitem) - - get_expression_pool.prepare_raw_expression(launchitem) - end - - # - # Launches a [business] process. - # The 'launch_object' param may contain either a LaunchItem instance, - # either a String containing the URL of the process definition - # to launch (with an empty LaunchItem created on the fly). - # - # The launch object can also be a String containing the XML process - # definition or directly a class extending OpenWFE::ProcessDefinition - # (Ruby process definition). - # - # Returns the FlowExpressionId instance of the expression at the - # root of the newly launched process. - # - # Options for scheduled launches like :at, :in and :cron are accepted - # via the 'options' optional parameter. - # For example : - # - # engine.launch(launch_item) - # # will launch immediately - # - # engine.launch(launch_item, :in => "1d20m") - # # will launch in one day and twenty minutes - # - # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007") - # # will launch at that point in time - # - # engine.launch(launch_item, :cron => "0 5 * * *") - # # will launch that same process every day, - # # five minutes after midnight (see "man 5 crontab") - # - # === :wait_for - # - # If you really need that, you can launch a process and wait for its - # termination (or cancellation or error) as in : - # - # engine.launch(launch_item, :wait_for => true) - # # will launch and return only when the process is over - # - # Note that if you set the option :wait_for to true, a triplet will - # be returned instead of just a FlowExpressionId. - # - # This triplet is composed of [ message, info, fei ] - # where message is :terminate, :error or :cancel and info contains - # either the workitem, the error or a wfid, respectively. - # - # See http://groups.google.com/group/openwferu-users/browse_frm/thread/ffd0589bdc877765 for more about this triplet. - # - # (Note that the current implementation of this :wait_for will return if - # any error was found. Thus, if an error occurs in a concurrent branch - # and the other branch goes on, the launch() will return, even if the - # rest of the process is continuing). - # - def launch (launch_object, options={}) - - launchitem = extract_launchitem launch_object - - fei = get_expression_pool.launch launchitem, options - - #linfo { "launch() #{fei.wfid} : #{fei.wfname} #{fei.wfrevision}" } - - fei.dup - # - # so that users of this launch() method can play with their - # fei without breaking things - end - - # # This method is used to feed a workitem back to the engine (after # it got sent to a worklist or wherever by a participant). # Participant implementations themselves do call this method usually. # # This method also accepts LaunchItem instances. @@ -301,13 +210,13 @@ return get_participant_map.onotify( workitem.participant_name, :reply, workitem) end - raise \ + raise( "InFlowWorkitem doesn't belong to a process instance" + - " nor to a participant" + " nor to a participant") end return get_expression_pool.launch(workitem) \ if workitem.is_a?(LaunchItem) # @@ -321,61 +230,10 @@ alias :forward :reply alias :proceed :reply # - # Adds a workitem listener to this engine. - # - # The 'freq' parameters if present might indicate how frequently - # the resource should be polled for incoming workitems. - # - # engine.add_workitem_listener(listener, "3m10s") - # # every 3 minutes and 10 seconds - # - # engine.add_workitem_listener(listener, "0 22 * * 1-5") - # # every weekday at 10pm - # - # TODO : block handling... - # - def add_workitem_listener (listener, freq=nil) - - name = nil - - if listener.kind_of?(Class) - - listener = init_service nil, listener - - name = listener.service_name - else - - name = listener.name if listener.respond_to?(:name) - name = "#{listener.class}::#{listener.object_id}" unless name - - @application_context[name] = listener - end - - result = nil - - if freq - - freq = freq.to_s.strip - - result = if Rufus::Scheduler.is_cron_string(freq) - - get_scheduler.schedule(freq, listener) - else - - get_scheduler.schedule_every(freq, listener) - end - end - - linfo { "add_workitem_listener() added '#{name}'" } - - result - end - - # # Makes the current thread join the engine's scheduler thread # # You can thus make an engine standalone with something like : # # require 'openwfe/engine/engine' @@ -440,281 +298,158 @@ @application_context.each do |sname, service| next if sname == self.service_name - #if service.kind_of?(ServiceMixin) if service.respond_to?(:stop) service.stop - linfo do - "stop() stopped service '#{sname}' (#{service.class})" - end + linfo { "stop() stopped service '#{sname}' (#{service.class})" } end end linfo { "stop() stopped engine '#{@service_name}'" } nil end + protected + + #-- + # the following methods may get overridden upon extension + # see for example file_persisted_engine.rb + #++ + # - # Waits for a given process instance to terminate. - # The method only exits when the flow terminates, but beware : if - # the process already terminated, the method will never exit. + # Builds the ExpressionMap (the mapping between expression names + # and expression implementations). # - # The parameter can be a FlowExpressionId instance, for example the - # one given back by a launch(), or directly a workflow instance id - # (String). + def build_expression_map + + @application_context[:s_expression_map] = ExpressionMap.new + # + # the expression map is not a Service anymore, + # it's a simple instance (that will be reused in other + # OpenWFEru components) + end + # - # This method is mainly used in utests. + # This implementation builds a KotobaWfidGenerator instance and + # binds it in the engine context. + # There are other WfidGeneration implementations available, like + # UuidWfidGenerator or FieldWfidGenerator. # - def wait_for (fei_or_wfid) + def build_wfid_generator - wfid = if fei_or_wfid.kind_of?(FlowExpressionId) - fei_or_wfid.workflow_instance_id - else - fei_or_wfid - end + #init_service(:s_wfid_generator, DefaultWfidGenerator) + #init_service(:s_wfid_generator, UuidWfidGenerator) + init_service(:s_wfid_generator, KotobaWfidGenerator) - get_expression_pool.send :wait_for, wfid + #g = FieldWfidGenerator.new( + # :s_wfid_generator, @application_context, "wfid") + # + # showing how to initialize a FieldWfidGenerator that + # will take as workflow instance id the value found in + # the field "wfid" of the LaunchItem. end # - # Looks up a process variable in a process. - # If fei_or_wfid is not given, will simply look in the - # 'engine environment' (where the top level variables '//' do reside). + # Builds the workqueue where apply/reply work is queued + # and processed. # - def lookup_variable (var_name, fei_or_wfid=nil) + def build_workqueue - return get_expression_pool.fetch_engine_environment[var_name] \ - unless fei_or_wfid - - fetch_exp(fei_or_wfid).lookup_variable var_name + init_service(:s_workqueue, WorkQueue) end # - # Returns the variables set for a process or an expression. + # Builds the OpenWFEru expression pool (the core of the engine) + # and binds it in the engine context. + # There is only one implementation of the expression pool, so + # this method is usually never overriden. # - # If a process (wfid) is given, variables of the process environment - # will be returned, else variables in the environment valid for the - # expression (fei) will be returned. - # - # If nothing (or nil) is given, the variables set in the engine - # environment will be returned. - # - def get_variables (fei_or_wfid=nil) + def build_expression_pool - return get_expression_pool.fetch_engine_environment.variables \ - unless fei_or_wfid - - fetch_exp(fei_or_wfid).get_environment.variables + init_service(:s_expression_pool, ExpressionPool) end # - # Returns an array of wfid (workflow instance ids) whose root - # environment contains the given variable + # The implementation here builds an InMemoryExpressionStorage + # instance. # - # If there are no matches, an empty array will be returned. + # See FilePersistedEngine or CachedFilePersistedEngine for + # overrides of this method. # - # Regular expressions are accepted as values. + def build_expression_storage + + init_service(:s_expression_storage, InMemoryExpressionStorage) + end + # - # If no value is given, all processes with the given variable name - # set will be returned. + # The ParticipantMap is a mapping between participant names + # (well rather regular expressions) and participant implementations + # (see http://openwferu.rubyforge.org/participants.html) # - def lookup_processes (var_name, value=nil) + def build_participant_map - # TODO : maybe this would be better in the ExpressionPool + init_service(:s_participant_map, ParticipantMap) + end - regexp = value.is_a?(Regexp) ? value : nil + # + # There is only one Scheduler implementation, that's the one + # built and bound here. + # + def build_scheduler - envs = get_expression_storage.find_expressions( - :include_classes => Environment) + @application_context[:s_scheduler] = Rufus::Scheduler.start_new( + :thread_name => + "rufus scheduler for Ruote (engine #{self.object_id})") - envs = envs.find_all do |env| + @application_context[:s_scheduler].extend(Logging) - val = env.variables[var_name] + linfo { "build_scheduler() version is #{Rufus::Scheduler::VERSION}" } + end - #(val and ((not regexp) or (regexp.match(val)))) - if val != nil - if regexp - regexp.match(val) - elsif value - val == value - else - true - end - else - false - end - end + # + # The default implementation of this method uses an + # InMemoryErrorJournal (do not use in production). + # + def build_error_journal - envs.collect { |env| env.fei.wfid } - - #envs.inject([]) do |r, env| - # val = env.variables[var_name] - # r << env.fei.wfid \ - # if (val and ((not regexp) or (regexp.match(val)))) - # r - #end - # - # seems slower... + init_service(:s_error_journal, InMemoryErrorJournal) end - protected + # + # builds the tree checker (see lib/openwfe/util/treechecker.rb) + # + def build_tree_checker - #-- - # the following methods may get overridden upon extension - # see for example file_persisted_engine.rb - #++ + init_service(:s_tree_checker, OpenWFE::TreeChecker) + end - # - # Builds the ExpressionMap (the mapping between expression names - # and expression implementations). - # - def build_expression_map + # + # builds the service that turn process definitions into runnable + # expression trees... + # + def build_def_parser - @application_context[:s_expression_map] = ExpressionMap.new - # - # the expression map is not a Service anymore, - # it's a simple instance (that will be reused in other - # OpenWFEru components) - end + init_service(:s_def_parser, DefParser) + end - # - # This implementation builds a KotobaWfidGenerator instance and - # binds it in the engine context. - # There are other WfidGeneration implementations available, like - # UuidWfidGenerator or FieldWfidGenerator. - # - def build_wfid_generator + # + # Whether the :no_expstorage_cache is set, a CacheExpressionStorage + # will be set or not. + # + def init_storage (storage_class) - #init_service :s_wfid_generator, DefaultWfidGenerator - #init_service :s_wfid_generator, UuidWfidGenerator - init_service :s_wfid_generator, KotobaWfidGenerator - - #g = FieldWfidGenerator.new( - # :s_wfid_generator, @application_context, "wfid") - # - # showing how to initialize a FieldWfidGenerator that - # will take as workflow instance id the value found in - # the field "wfid" of the LaunchItem. + if @application_context[:no_expstorage_cache] + init_service(:s_expression_storage, storage_class) + else + init_service(:s_expression_storage, CacheExpressionStorage) + init_service(:s_expression_storage__1, storage_class) end - - # - # Builds the workqueue where apply/reply work is queued - # and processed. - # - def build_workqueue - - init_service :s_workqueue, WorkQueue - end - - # - # Builds the OpenWFEru expression pool (the core of the engine) - # and binds it in the engine context. - # There is only one implementation of the expression pool, so - # this method is usually never overriden. - # - def build_expression_pool - - init_service :s_expression_pool, ExpressionPool - end - - # - # The implementation here builds an InMemoryExpressionStorage - # instance. - # - # See FilePersistedEngine or CachedFilePersistedEngine for - # overrides of this method. - # - def build_expression_storage - - init_service :s_expression_storage, InMemoryExpressionStorage - end - - # - # The ParticipantMap is a mapping between participant names - # (well rather regular expressions) and participant implementations - # (see http://openwferu.rubyforge.org/participants.html) - # - def build_participant_map - - init_service :s_participant_map, ParticipantMap - end - - # - # There is only one Scheduler implementation, that's the one - # built and bound here. - # - def build_scheduler - - @application_context[:s_scheduler] = Rufus::Scheduler.start_new( - :thread_name => - "rufus scheduler for Ruote (engine #{self.object_id})") - - @application_context[:s_scheduler].extend Logging - - linfo { "build_scheduler() version is #{Rufus::Scheduler::VERSION}" } - end - - # - # The default implementation of this method uses an - # InMemoryErrorJournal (do not use in production). - # - def build_error_journal - - init_service :s_error_journal, InMemoryErrorJournal - end - - # - # builds the tree checker (see lib/openwfe/util/treechecker.rb) - # - def build_tree_checker - - init_service :s_tree_checker, OpenWFE::TreeChecker - end - - # - # builds the service that turn process definitions into runnable - # expression trees... - # - def build_def_parser - - init_service :s_def_parser, DefParser - end - - # - # Turns the raw launch request info into a LaunchItem instance. - # - def extract_launchitem (launch_object) - - if launch_object.kind_of?(OpenWFE::LaunchItem) - - launch_object - - elsif launch_object.kind_of?(Class) - - LaunchItem.new launch_object - - elsif launch_object.kind_of?(String) - - li = OpenWFE::LaunchItem.new - - if launch_object[0, 1] == '<' or launch_object.index("\n") - - li.workflow_definition_url = "field:__definition" - li['__definition'] = launch_object - - else - - li.workflow_definition_url = launch_object - end - - li - end - end + end end end