# #-- # Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # . Redistributions of source code must retain the above copyright notice, this # list of conditions and the following disclaimer. # # . Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # # "made in Japan" # # John Mettraux at openwfe.org # Nicolas Modrzyk at openwfe.org # require 'logger' require 'fileutils' require 'rufus/scheduler' # gem 'rufus-scheduler' require 'openwfe/omixins' require 'openwfe/rudefinitions' require 'openwfe/service' require 'openwfe/workitem' require 'openwfe/util/irb' require 'openwfe/expool/wfidgen' require 'openwfe/expool/expressionpool' require 'openwfe/expool/expstorage' require 'openwfe/expool/errorjournal' require 'openwfe/engine/process_status' require 'openwfe/expressions/environment' require 'openwfe/expressions/expressionmap' require 'openwfe/participants/participantmap' module OpenWFE # # The simplest implementation of the OpenWFE workflow engine. # No persistence is used, everything is stored in memory. # class Engine < Service include OwfeServiceLocator include FeiMixin include StatusMixin # # Builds an OpenWFEru engine. # # Accepts an optional initial application_context (containing # initialization params for services for example). # # The engine itself uses one param :logger, used to define # where all the log output for OpenWFEru should go. # By default, this output goes to logs/openwferu.log # def initialize (application_context={}) super S_ENGINE, application_context $OWFE_LOG = application_context[:logger] unless $OWFE_LOG #puts "Creating logs in " + FileUtils.pwd FileUtils.mkdir("logs") unless File.exist?("logs") $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000 $OWFE_LOG.level = Logger::INFO end # build order matters. # # especially for the expstorage which 'observes' the expression # pool and thus needs to be instantiated after it. build_scheduler # # for delayed or repetitive executions (it's the engine's clock) # see http://openwferu.rubyforge.org/scheduler.html build_expression_map # # mapping expression names ('sequence', 'if', 'concurrence', # 'when'...) to their implementations (SequenceExpression, # IfExpression, ConcurrenceExpression, ...) build_wfid_generator # # the workflow instance (process instance) id generator # making sure each process instance has a unique identifier build_expression_pool # # the core (hairy ball) of the engine build_expression_storage # # the engine persistence (persisting the expression instances # that make up process instances) build_participant_map # # building the services that maps participant names to # participant implementations / instances. build_error_journal # # builds the error journal (keeping track of failures # in business process executions, and an opportunity to # fix and replay) linfo { "new() --- engine started --- #{self.object_id}" } end # # Call this method once the participants for a persisted engine # have been [re]added. # # If this method is called too soon, missing participants will # cause trouble... Call this method after all the participants # have been added. # def reschedule get_expression_pool.reschedule() end alias :reload :reschedule # # When 'parameters' are used at the top of a process definition, this # method can be used to assert a launchitem before launch. # An expression will be raised if the parameters do not match the # requirements. # # Note that the launch method will raise those exceptions as well. # This method can be useful in some scenarii though. # def pre_launch_check (launchitem) get_expression_pool.prepare_raw_expression(launchitem) end # # Launches a [business] process. # The 'launch_object' param may contain either a LaunchItem instance, # either a String containing the URL of the process definition # to launch (with an empty LaunchItem created on the fly). # # The launch object can also be a String containing the XML process # definition or directly a class extending OpenWFE::ProcessDefinition # (Ruby process definition). # # Returns the FlowExpressionId instance of the expression at the # root of the newly launched process. # # Options for scheduled launches like :at, :in and :cron are accepted # via the 'options' optional parameter. # For example : # # engine.launch(launch_item) # # will launch immediately # # engine.launch(launch_item, :in => "1d20m") # # will launch in one day and twenty minutes # # engine.launch(launch_item, :at => "Tue Sep 11 20:23:02 +0900 2007") # # will launch at that point in time # # engine.launch(launch_item, :cron => "0 5 * * *") # # will launch that same process every day, # # five minutes after midnight (see "man 5 crontab") # def launch (launch_object, options={}) launchitem = extract_launchitem launch_object fei = get_expression_pool.launch launchitem, options fei.dup # # so that users of this launch() method can play with their # fei without breaking things end # # This method is used to feed a workitem back to the engine (after # it got sent to a worklist or wherever by a participant). # Participant implementations themselves do call this method usually. # # This method also accepts LaunchItem instances. # # Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem # that don't belong to a process instance (ie whose flow_expression_id # is nil). It will simply notify the participant_map of the reply # for the given participant_name. If there is no participant_name # specified for this orphan workitem, an exception will be raised. # def reply (workitem) if workitem.is_a?(InFlowWorkItem) if workitem.flow_expression_id # # vanilla case, workitem coming back # (from listener probably) return get_expression_pool.reply( workitem.flow_expression_id, workitem) end if workitem.participant_name # # a workitem that doesn't belong to a process instance # but bears a participant name. # Notify, there may be something listening on # this channel (see the 'listen' expression). return get_participant_map.onotify( workitem.participant_name, :reply, workitem) end raise \ "InFlowWorkitem doesn't belong to a process instance" + " nor to a participant" end return get_expression_pool.launch(workitem) \ if workitem.is_a?(LaunchItem) # # launchitem coming from listener # let's attempt to launch a new process instance raise \ "engine.reply() " + "cannot handle instances of #{workitem.class}" end alias :forward :reply alias :proceed :reply # # Registers a participant in this [embedded] engine. # This method is a shortcut to the ParticipantMap method # with the same name. # # engine.register_participant "user-.*", HashParticipant # # or # # engine.register_participant "user-.*" do |wi| # puts "participant '#{wi.participant_name}' received a workitem" # # # # and did nothing with it # # as a block participant implicitely returns the workitem # # to the engine # end # # Returns the participant instance. # # The participant parameter can be set to hash like in # # engine.register_participant( # "alpha", # { :participant => HashParticipant, :position => :first }) # # or # # engine.register_participant("alpha", :position => :first) do # puts "first !" # end # # There are some times where you have to position a participant first # (especially with the regex technique). # # see ParticipantMap#register_participant # def register_participant (regex, participant=nil, &block) #get_participant_map.register_participant( # regex, participant, &block) params = if participant.class == Hash participant else { :participant => participant } end get_participant_map.register_participant regex, params, &block end # # Given a participant name, returns the participant in charge # of handling workitems for that name. # May be useful in some embedded contexts. # def get_participant (participant_name) get_participant_map.lookup_participant participant_name end # # Removes the first participant matching the given name from the # participant map kept by the engine. # def unregister_participant (participant_name) get_participant_map.unregister_participant participant_name end # # Adds a workitem listener to this engine. # # The 'freq' parameters if present might indicate how frequently # the resource should be polled for incoming workitems. # # engine.add_workitem_listener(listener, "3m10s") # # every 3 minutes and 10 seconds # # engine.add_workitem_listener(listener, "0 22 * * 1-5") # # every weekday at 10pm # # TODO : block handling... # def add_workitem_listener (listener, freq=nil) name = nil if listener.kind_of? Class listener = init_service nil, listener name = listener.service_name else name = listener.name if listener.respond_to? :name name = "#{listener.class}::#{listener.object_id}" unless name @application_context[name] = listener end result = nil if freq freq = freq.to_s.strip result = if Rufus::Scheduler.is_cron_string(freq) get_scheduler.schedule(freq, listener) else get_scheduler.schedule_every(freq, listener) end end linfo { "add_workitem_listener() added '#{name}'" } result end # # Makes the current thread join the engine's scheduler thread # # You can thus make an engine standalone with something like : # # require 'openwfe/engine/engine' # # the_engine = OpenWFE::Engine.new # the_engine.join # # And you'll have to hit CTRL-C to make it stop. # def join get_scheduler.join end # # Calling this method makes the control flow block until the # workflow engine is inactive. # # TODO : implement idle_for # def join_until_idle storage = get_expression_storage while storage.size > 1 sleep 1 end end # # Enabling the console means that hitting CTRL-C on the window / # term / dos box / whatever does run the OpenWFEru engine will # open an IRB interactive console for directly manipulating the # engine instance. # # Hit CTRL-D to get out of the console. # def enable_irb_console OpenWFE::trap_int_irb(binding) end #-- # Makes sure that hitting CTRL-C will actually kill the engine VM and # not open an IRB console. # #def disable_irb_console # $openwfe_irb = nil # trap 'INT' do # exit 0 # end #end #++ # # Stopping the engine will stop all the services in the # application context. # def stop linfo { "stop() stopping engine '#{@service_name}'" } @application_context.each do |sname, service| next if sname == self.service_name #if service.kind_of?(ServiceMixin) if service.respond_to?(:stop) service.stop linfo do "stop() stopped service '#{sname}' (#{service.class})" end end end linfo { "stop() stopped engine '#{@service_name}'" } nil end # # Waits for a given process instance to terminate. # The method only exits when the flow terminates, but beware : if # the process already terminated, the method will never exit. # # The parameter can be a FlowExpressionId instance, for example the # one given back by a launch(), or directly a workflow instance id # (String). # # This method is mainly used in utests. # def wait_for (fei_or_wfid) wfid = if fei_or_wfid.kind_of?(FlowExpressionId) fei_or_wfid.workflow_instance_id else fei_or_wfid end t = Thread.new { Thread.stop } to = get_expression_pool.add_observer(:terminate) do |c, fe, wi| t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?) end te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| t.wakeup if (fei.parent_wfid == wfid and t.alive?) end #tc = get_expression_pool.add_observer(:cancel) do |c, fe| # if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?) # sleep 0.500 # t.wakeup # end #end linfo { "wait_for() #{wfid}" } t.join get_expression_pool.remove_observer(to, :terminate) get_expression_pool.remove_observer(te, :error) #get_expression_pool.remove_observer(tc, :cancel) # # it would work as well without specifying the channel, # but it's thus a little bit faster end #-- # METHODS FROM THE EXPRESSION POOL # # These methods are 'proxy' to method found in the expression pool. # They are made available here for a simpler model. #++ # # Returns the list of applied expressions belonging to a given # workflow instance. # May be used to determine where a process instance currently is. # # This method returns all the expressions (the stack) a process # went through to reach its current state. # # If the unapplied optional parameter is set to true, all the # expressions (even those not yet applied) that compose the process # instance will be returned. # def process_stack (workflow_instance_id, unapplied=false) get_expression_pool.process_stack workflow_instance_id, unapplied end alias :get_process_stack :process_stack alias :get_flow_stack :process_stack # # Lists all workflow (process) instances currently in the expool (in # the engine). # This method will return a list of "process-definition" expressions # (i.e. OpenWFE::DefineExpression objects -- each representing the root # element of a flow). # # :wfid :: # will list only one process, # :wfid => '20071208-gipijiwozo' # :parent_wfid :: # will list only one process, and its subprocesses, # :parent_wfid => '20071208-gipijiwozo' # :consider_subprocesses :: # if true, "process-definition" expressions # of subprocesses will be returned as well. # :wfid_prefix :: # allows your to query for specific workflow instance # id prefixes. for example : # :wfid_prefix => "200712" # for the processes started in December. # :wfname :: # will return only the process instances who belongs to the given # workflow [name]. # :wfrevision :: # usued in conjuction with :wfname, returns only the process # instances of a given workflow revision. # def list_processes (options={}) get_expression_pool.list_processes options end alias :list_workflows :list_processes # # Given any expression of a process, cancels the complete process # instance. # def cancel_process (exp_or_wfid) get_expression_pool.cancel_process exp_or_wfid end alias :cancel_flow :cancel_process alias :abort_process :cancel_process # # Cancels the given expression (and its children if any) # (warning : advanced method) # # Cancelling the root expression of a process is equivalent to # cancelling the process. # def cancel_expression (exp_or_fei) get_expression_pool.cancel_expression exp_or_fei end # # Forgets the given expression (make it an orphan) # (warning : advanced method) # def forget_expression (exp_or_fei) get_expression_pool.forget exp_or_fei end # # Pauses a process (sets its /__paused__ variable to true). # def pause_process (wfid) wfid = extract_wfid wfid root_expression = get_expression_pool.fetch_root wfid get_expression_pool.paused_instances[wfid] = true root_expression.set_variable VAR_PAUSED, true end # # Restarts a process : removes its 'paused' flag (variable) and makes # sure to 'replay' events (replies) that came for it while it was # in pause. # def resume_process (wfid) wfid = extract_wfid wfid root_expression = get_expression_pool.fetch_root wfid # # remove 'paused' flag get_expression_pool.paused_instances.delete wfid root_expression.unset_variable VAR_PAUSED # # replay # # select PausedError instances in separate list errors = get_error_journal.get_error_log wfid error_class = PausedError.name paused_errors = errors.select { |e| e.error_class == error_class } return if paused_errors.size < 1 # replay select PausedError instances paused_errors.each do |e| replay_at_error e end end # # Takes care of removing an error from the error journal and # they replays its process at that point. # def replay_at_error (error) get_error_journal.remove_errors( error.fei.parent_wfid, error) get_expression_pool.queue_work( error.message, error.fei, error.workitem) end # # Looks up a process variable in a process. # If fei_or_wfid is not given, will simply look in the # 'engine environment' (where the top level variables '//' do reside). # def lookup_variable (var_name, fei_or_wfid=nil) return get_expression_pool.fetch_engine_environment[var_name] \ unless fei_or_wfid fetch_exp(fei_or_wfid).lookup_variable var_name end # # Returns the variables set for a process or an expression. # # If a process (wfid) is given, variables of the process environment # will be returned, else variables in the environment valid for the # expression (fei) will be returned. # # If nothing (or nil) is given, the variables set in the engine # environment will be returned. # def get_variables (fei_or_wfid=nil) return get_expression_pool.fetch_engine_environment.variables \ unless fei_or_wfid fetch_exp(fei_or_wfid).get_environment.variables end # # Returns an array of wfid (workflow instance ids) whose root # environment containes the given variable # # If there are no matches, an empty array will be returned. # # Regular expressions are accepted as values. # # If no value is given, all processes with the given variable name # set will be returned. # def lookup_processes (var_name, value=nil) # TODO : maybe this would be better in the ExpressionPool regexp = if value if value.is_a?(Regexp) value else Regexp.compile(value.to_s) end else nil end envs = get_expression_storage.find_expressions( :include_classes => Environment) envs = envs.find_all do |env| val = env.variables[var_name] (val and ((not regexp) or (regexp.match(val)))) end envs.collect do |env| env.fei.wfid end #envs.inject([]) do |r, env| # val = env.variables[var_name] # r << env.fei.wfid \ # if (val and ((not regexp) or (regexp.match(val)))) # r #end # # seems slower... end # # Use only when doing "process gardening". # # This method updates an expression, the 'data' parameter is expected # to be a hash. If the expression is an Environment, the variables # will be merged with the ones found in the data param. # If the expression is not an Environment, the data will be merged # into the 'applied_workitem' if any. # # If the merge is not possible, an exception will be raised. # def update_expression_data (fei, data) fexp = fetch_exp fei original = if fexp.is_a?(Environment) fexp.variables else fexp.applied_workitem.attributes end original.merge! data get_expression_pool.update fexp end # # A variant of update_expression() that directly replaces # the raw representation stored within a RawExpression. # # Useful for modifying [not yet reached] segments of processes. # def update_raw_expression (fei, representation) fexp = fetch_exp fei raise "cannot update already applied expression" \ unless fexp.is_a?(RawExpression) fexp.raw_representation = representation get_expression_pool.update fexp end # # Replaces an expression in the pool with a newer version of it. # # (useful when fixing processes on the fly) # def update_expression (fexp) fexp.application_context = application_context get_expression_pool.update fexp end protected #-- # the following methods may get overridden upon extension # see for example file_persisted_engine.rb #++ def build_expression_map @application_context[S_EXPRESSION_MAP] = ExpressionMap.new # # the expression map is not a Service anymore, # it's a simple instance (that will be reused in other # OpenWFEru components) end # # This implementation builds a KotobaWfidGenerator instance and # binds it in the engine context. # There are other WfidGeneration implementations available, like # UuidWfidGenerator or FieldWfidGenerator. # def build_wfid_generator #init_service S_WFID_GENERATOR, DefaultWfidGenerator #init_service S_WFID_GENERATOR, UuidWfidGenerator init_service S_WFID_GENERATOR, KotobaWfidGenerator #g = FieldWfidGenerator.new( # S_WFID_GENERATOR, @application_context, "wfid") # # showing how to initialize a FieldWfidGenerator that # will take as workflow instance id the value found in # the field "wfid" of the LaunchItem. end # # Builds the OpenWFEru expression pool (the core of the engine) # and binds it in the engine context. # There is only one implementation of the expression pool, so # this method is usually never overriden. # def build_expression_pool init_service S_EXPRESSION_POOL, ExpressionPool end # # The implementation here builds an InMemoryExpressionStorage # instance. # # See FilePersistedEngine or CachedFilePersistedEngine for # overrides of this method. # def build_expression_storage init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage end # # The ParticipantMap is a mapping between participant names # (well rather regular expressions) and participant implementations # (see http://openwferu.rubyforge.org/participants.html) # def build_participant_map init_service S_PARTICIPANT_MAP, ParticipantMap end # # There is only one Scheduler implementation, that's the one # built and bound here. # def build_scheduler scheduler = Rufus::Scheduler.new @application_context[S_SCHEDULER] = scheduler scheduler.start end # # The default implementation of this method uses an # InMemoryErrorJournal (do not use in production). # def build_error_journal init_service S_ERROR_JOURNAL, InMemoryErrorJournal end # # Turns the raw launch request info into a LaunchItem instance. # def extract_launchitem (launch_object) if launch_object.kind_of?(OpenWFE::LaunchItem) launch_object elsif launch_object.kind_of?(Class) LaunchItem.new launch_object elsif launch_object.kind_of?(String) li = OpenWFE::LaunchItem.new if launch_object[0, 1] == '<' or launch_object.index("\n") li.workflow_definition_url = "field:__definition" li['__definition'] = launch_object else li.workflow_definition_url = launch_object end li end end # # In case of wfid, returns the root expression of the process, # in case of fei, returns the expression itself. # def fetch_exp (fei_or_wfid) exp = if fei_or_wfid.is_a?(String) get_expression_pool.fetch_root fei_or_wfid else get_expression_pool.fetch_expression fei_or_wfid end exp or raise "no expression found for '#{fei_or_wfid.to_s}'" end end end