lib/openwfe/engine/engine.rb in openwferu-0.9.16 vs lib/openwfe/engine/engine.rb in openwferu-0.9.17

- old
+ new

@@ -1,8 +1,8 @@ # #-- -# Copyright (c) 2006-2007, John Mettraux, Nicolas Modrzyk OpenWFE.org +# Copyright (c) 2006-2008, John Mettraux, Nicolas Modrzyk OpenWFE.org # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # @@ -39,21 +39,22 @@ # require 'logger' require 'fileutils' +require 'rufus/scheduler' # gem 'rufus-scheduler' + require 'openwfe/omixins' require 'openwfe/rudefinitions' require 'openwfe/service' require 'openwfe/workitem' require 'openwfe/util/irb' -require 'openwfe/util/scheduler' -require 'openwfe/util/schedulers' require 'openwfe/expool/wfidgen' require 'openwfe/expool/expressionpool' require 'openwfe/expool/expstorage' require 'openwfe/expool/errorjournal' +require 'openwfe/engine/process_status' require 'openwfe/expressions/environment' require 'openwfe/expressions/expressionmap' require 'openwfe/participants/participantmap' @@ -64,10 +65,11 @@ # No persistence is used, everything is stored in memory. # class Engine < Service include OwfeServiceLocator include FeiMixin + include StatusMixin # # Builds an OpenWFEru engine. # # Accepts an optional initial application_context (containing @@ -77,11 +79,11 @@ # where all the log output for OpenWFEru should go. # By default, this output goes to logs/openwferu.log # def initialize (application_context={}) - super(S_ENGINE, application_context) + super S_ENGINE, application_context $OWFE_LOG = application_context[:logger] unless $OWFE_LOG #puts "Creating logs in " + FileUtils.pwd @@ -264,17 +266,53 @@ # # Registers a participant in this [embedded] engine. # This method is a shortcut to the ParticipantMap method # with the same name. # + # engine.register_participant "user-.*", HashParticipant + # + # or + # + # engine.register_participant "user-.*" do |wi| + # puts "participant '#{wi.participant_name}' received a workitem" + # # + # # and did nothing with it + # # as a block participant implicitely returns the workitem + # # to the engine + # end + # # Returns the participant instance. # + # The participant parameter can be set to hash like in + # + # engine.register_participant( + # "alpha", + # { :participant => HashParticipant, :position => :first }) + # + # or + # + # engine.register_participant("alpha", :position => :first) do + # puts "first !" + # end + # + # There are some times where you have to position a participant first + # (especially with the regex technique). + # # see ParticipantMap#register_participant # def register_participant (regex, participant=nil, &block) - get_participant_map.register_participant(regex, participant, &block) + #get_participant_map.register_participant( + # regex, participant, &block) + + params = if participant.class == Hash + participant + else + { :participant => participant } + end + + get_participant_map.register_participant regex, params, &block end # # Given a participant name, returns the participant in charge # of handling workitems for that name. @@ -329,11 +367,11 @@ if freq freq = freq.to_s.strip - result = if Scheduler.is_cron_string(freq) + result = if Rufus::Scheduler.is_cron_string(freq) get_scheduler.schedule(freq, listener) else get_scheduler.schedule_every(freq, listener) @@ -366,11 +404,11 @@ # Calling this method makes the control flow block until the # workflow engine is inactive. # # TODO : implement idle_for # - def join_until_idle () + def join_until_idle storage = get_expression_storage while storage.size > 1 sleep 1 @@ -408,21 +446,21 @@ # def stop linfo { "stop() stopping engine '#{@service_name}'" } - @application_context.each do |name, service| + @application_context.each do |sname, service| - next if name == self.service_name + next if sname == self.service_name - #service.stop if service.respond_to? :stop + #if service.kind_of?(ServiceMixin) + if service.respond_to?(:stop) - if service.kind_of? ServiceMixin service.stop + linfo do - "stop() stopped service '#{service.service_name}' "+ - "(#{service.class})" + "stop() stopped service '#{sname}' (#{service.class})" end end end linfo { "stop() stopped engine '#{@service_name}'" } @@ -455,73 +493,29 @@ t.wakeup if (fe.fei.workflow_instance_id == wfid and t.alive?) end te = get_expression_pool.add_observer(:error) do |c, fei, m, i, e| t.wakeup if (fei.parent_wfid == wfid and t.alive?) end + #tc = get_expression_pool.add_observer(:cancel) do |c, fe| + # if (fe.fei.wfid == wfid and fe.fei.expid == "0" and t.alive?) + # sleep 0.500 + # t.wakeup + # end + #end linfo { "wait_for() #{wfid}" } t.join get_expression_pool.remove_observer(to, :terminate) get_expression_pool.remove_observer(te, :error) + #get_expression_pool.remove_observer(tc, :cancel) # # it would work as well without specifying the channel, # but it's thus a little bit faster end - # - # Returns a hash of ProcessStatus instances. The keys of the hash - # are workflow instance ids. - # - # A ProcessStatus is a description of the state of a process instance. - # It enumerates the expressions where the process is currently - # located (waiting certainly) and the errors the process currently - # has (hopefully none). - # - def list_process_status (wfid=nil) - - wfid = to_wfid(wfid) if wfid - - result = {} - - get_expression_storage.real_each(wfid) do |fei, fexp| - next if fexp.kind_of?(Environment) - next unless fexp.apply_time - (result[fei.parent_wfid] ||= ProcessStatus.new) << fexp - end - - result.values.each do |ps| - get_error_journal.get_error_log(ps.wfid).each do |error| - ps << error - end - end - - class << result - def to_s - pretty_print_process_status(self) - end - end - - result - end - - # - # list_process_status() will be deprecated at release 1.0.0 - # - alias :get_process_status :list_process_status - - # - # Returns the process status of one given process instance. - # - def process_status (wfid) - - wfid = to_wfid(wfid) - - list_process_status(wfid).values[0] - end - #-- # METHODS FROM THE EXPRESSION POOL # # These methods are 'proxy' to method found in the expression pool. # They are made available here for a simpler model. @@ -533,76 +527,98 @@ # May be used to determine where a process instance currently is. # # This method returns all the expressions (the stack) a process # went through to reach its current state. # - def get_process_stack (workflow_instance_id) + # If the unapplied optional parameter is set to true, all the + # expressions (even those not yet applied) that compose the process + # instance will be returned. + # + def process_stack (workflow_instance_id, unapplied=false) - get_expression_pool.get_process_stack(workflow_instance_id) + get_expression_pool.process_stack workflow_instance_id, unapplied end - alias :get_flow_stack :get_process_stack + alias :get_process_stack :process_stack + alias :get_flow_stack :process_stack # # Lists all workflow (process) instances currently in the expool (in # the engine). # This method will return a list of "process-definition" expressions # (i.e. OpenWFE::DefineExpression objects -- each representing the root # element of a flow). # - # consider_subprocesses :: if true, "process-definition" expressions of - # subprocesses will be returned as well. - # wfid_prefix :: allows your to query for specific workflow instance - # id prefixes. + # :wfid :: + # will list only one process, + # <tt>:wfid => '20071208-gipijiwozo'</tt> + # :parent_wfid :: + # will list only one process, and its subprocesses, + # <tt>:parent_wfid => '20071208-gipijiwozo'</tt> + # :consider_subprocesses :: + # if true, "process-definition" expressions + # of subprocesses will be returned as well. + # :wfid_prefix :: + # allows your to query for specific workflow instance + # id prefixes. for example : + # <tt>:wfid_prefix => "200712"</tt> + # for the processes started in December. + # :wfname :: + # will return only the process instances who belongs to the given + # workflow [name]. + # :wfrevision :: + # usued in conjuction with :wfname, returns only the process + # instances of a given workflow revision. # - def list_processes (consider_subprocesses=false, wfid_prefix=nil) + def list_processes (options={}) - get_expression_pool.list_processes( - consider_subprocesses, wfid_prefix) + get_expression_pool.list_processes options end alias :list_workflows :list_processes # # Given any expression of a process, cancels the complete process # instance. # def cancel_process (exp_or_wfid) - get_expression_pool.cancel_process(exp_or_wfid) + get_expression_pool.cancel_process exp_or_wfid end alias :cancel_flow :cancel_process + alias :abort_process :cancel_process # # Cancels the given expression (and its children if any) # (warning : advanced method) # # Cancelling the root expression of a process is equivalent to # cancelling the process. # def cancel_expression (exp_or_fei) - get_expression_pool.cancel_expression(exp_or_fei) + get_expression_pool.cancel_expression exp_or_fei end # # Forgets the given expression (make it an orphan) # (warning : advanced method) # def forget_expression (exp_or_fei) - get_expression_pool.forget(exp_or_fei) + get_expression_pool.forget exp_or_fei end # # Pauses a process (sets its /__paused__ variable to true). # def pause_process (wfid) - wfid = extract_wfid(wfid) + wfid = extract_wfid wfid - root_expression = get_expression_pool.fetch_root(wfid) + root_expression = get_expression_pool.fetch_root wfid - root_expression.set_variable(VAR_PAUSED, true) + get_expression_pool.paused_instances[wfid] = true + root_expression.set_variable VAR_PAUSED, true end # # Restarts a process : removes its 'paused' flag (variable) and makes # sure to 'replay' events (replies) that came for it while it was @@ -615,11 +631,12 @@ root_expression = get_expression_pool.fetch_root wfid # # remove 'paused' flag - root_expression.unset_variable(VAR_PAUSED) + get_expression_pool.paused_instances.delete wfid + root_expression.unset_variable VAR_PAUSED # # replay # # select PausedError instances in separate list @@ -661,22 +678,29 @@ def lookup_variable (var_name, fei_or_wfid=nil) return get_expression_pool.fetch_engine_environment[var_name] \ unless fei_or_wfid - exp = if fei_or_wfid.is_a?(String) + fetch_exp(fei_or_wfid).lookup_variable var_name + end - get_expression_pool.fetch_root(fei_or_wfid) + # + # Returns the variables set for a process or an expression. + # + # If a process (wfid) is given, variables of the process environment + # will be returned, else variables in the environment valid for the + # expression (fei) will be returned. + # + # If nothing (or nil) is given, the variables set in the engine + # environment will be returned. + # + def get_variables (fei_or_wfid=nil) - else + return get_expression_pool.fetch_engine_environment.variables \ + unless fei_or_wfid - get_expression_pool.fetch_expression(fei_or_wfid) - end - - raise "no expression found for '#{fei_or_wfid.to_s}'" unless exp - - exp.lookup_variable var_name + fetch_exp(fei_or_wfid).get_environment.variables end # # Returns an array of wfid (workflow instance ids) whose root # environment containes the given variable @@ -690,35 +714,99 @@ # def lookup_processes (var_name, value=nil) # TODO : maybe this would be better in the ExpressionPool - result = [] - regexp = if value if value.is_a?(Regexp) value else Regexp.compile(value.to_s) end else nil end - get_expression_storage.each_of_kind(Environment) do |fei, env| + envs = get_expression_storage.find_expressions( + :include_classes => Environment) + envs = envs.find_all do |env| val = env.variables[var_name] + (val and ((not regexp) or (regexp.match(val)))) + end + envs.collect do |env| + env.fei.wfid + end - next unless val - next if regexp and (not regexp.match(val)) + #envs.inject([]) do |r, env| + # val = env.variables[var_name] + # r << env.fei.wfid \ + # if (val and ((not regexp) or (regexp.match(val)))) + # r + #end + # + # seems slower... + end - result.push env.fei.wfid + # + # Use only when doing "process gardening". + # + # This method updates an expression, the 'data' parameter is expected + # to be a hash. If the expression is an Environment, the variables + # will be merged with the ones found in the data param. + # If the expression is not an Environment, the data will be merged + # into the 'applied_workitem' if any. + # + # If the merge is not possible, an exception will be raised. + # + def update_expression_data (fei, data) + + fexp = fetch_exp fei + + original = if fexp.is_a?(Environment) + + fexp.variables + else + + fexp.applied_workitem.attributes end - result + original.merge! data + + get_expression_pool.update fexp end + # + # A variant of update_expression() that directly replaces + # the raw representation stored within a RawExpression. + # + # Useful for modifying [not yet reached] segments of processes. + # + def update_raw_expression (fei, representation) + + fexp = fetch_exp fei + + raise "cannot update already applied expression" \ + unless fexp.is_a?(RawExpression) + + fexp.raw_representation = representation + + get_expression_pool.update fexp + end + + # + # Replaces an expression in the pool with a newer version of it. + # + # (useful when fixing processes on the fly) + # + def update_expression (fexp) + + fexp.application_context = application_context + + get_expression_pool.update fexp + end + protected #-- # the following methods may get overridden upon extension # see for example file_persisted_engine.rb @@ -790,11 +878,15 @@ # There is only one Scheduler implementation, that's the one # built and bound here. # def build_scheduler - init_service S_SCHEDULER, SchedulerService + scheduler = Rufus::Scheduler.new + + @application_context[S_SCHEDULER] = scheduler + + scheduler.start end # # The default implementation of this method uses an # InMemoryErrorJournal (do not use in production). @@ -819,11 +911,10 @@ elsif launch_object.kind_of?(String) li = OpenWFE::LaunchItem.new - #if launch_object[0, 1] == '<' or launch_object.match("\n") if launch_object[0, 1] == '<' or launch_object.index("\n") li.workflow_definition_url = "field:__definition" li['__definition'] = launch_object @@ -834,166 +925,26 @@ li end end - end + # + # In case of wfid, returns the root expression of the process, + # in case of fei, returns the expression itself. + # + def fetch_exp (fei_or_wfid) - # - # ProcessStatus represents information about the status of a workflow - # process instance. - # - # The status is mainly a list of expressions and a hash of errors. - # - # Instances of this class are obtained via Engine.process_status(). - # - class ProcessStatus + exp = if fei_or_wfid.is_a?(String) - # - # the String workflow instance id of the Process. - # - attr_reader :wfid + get_expression_pool.fetch_root fei_or_wfid - # - # The list of the expressions currently active in the process instance. - # - # For instance, if your process definition is currently in a - # concurrence, more than one expressions may be listed here. - # - attr_reader :expressions + else - # - # A hash whose values are ProcessError instances, the keys - # are FlowExpressionId instances (fei) (identifying the expressions - # that are concerned with the error) - # - attr_reader :errors - - # - # The time at which the process got launched. - # - attr_reader :launch_time - - def initialize - @wfid = nil - @expressions = [] - @errors = {} - @launch_time = nil - end - - # - # Returns true if the process is in pause. - # - def paused? - - exp = @expressions[0] - exp != nil and exp.paused? - end - - # - # this method is used by Engine.get_process_status() when - # it prepares its results. - # - def << (item) - - if item.kind_of?(FlowExpression) - add_expression item - else - add_error item - end - end - - # - # A String representation, handy for debugging, quick viewing. - # - def to_s - s = "" - s << "-- #{self.class.name} --\n" - s << " wfid : #{@wfid}\n" - s << " expressions :\n" - @expressions.each do |fexp| - s << " #{fexp.fei}\n" - end - s << " errors : #{@errors.size}\n" - s << " paused : #{paused?}" - s - end - - protected - - def add_expression (fexp) - - set_wfid fexp.fei.parent_wfid - - @launch_time = fexp.apply_time if fexp.fei.expid == '0' - - exps = @expressions - @expressions = [] - - added = false - @expressions = exps.collect do |fe| - if added or fe.fei.wfid != fexp.fei.wfid - fe - else - if OpenWFE::starts_with(fexp.fei.expid, fe.fei.expid) - added = true - fexp - elsif OpenWFE::starts_with(fe.fei.expid, fexp.fei.expid) - added = true - fe - else - fe - end - end + get_expression_pool.fetch_expression fei_or_wfid end - @expressions << fexp unless added - end - def add_error (error) - @errors[error.fei] = error + exp or raise "no expression found for '#{fei_or_wfid.to_s}'" end - - def set_wfid (wfid) - - return if @wfid - @wfid = wfid - end - end - - # - # Renders a nice, terminal oriented, representation of an - # Engine.get_process_status() result. - # - # You usually directly benefit from this when doing - # - # puts engine.get_process_status.to_s - # - def pretty_print_process_status (ps) - - s = "" - s << "process_id | name | rev | brn | err | paused? \n" - s << "--------------------+-------------------+---------+-----+-----+---------\n" - - ps.keys.sort.each do |wfid| - - status = ps[wfid] - fexp = status.expressions[0] - ffei = fexp.fei - - s << "%-19s" % wfid[0, 19] - s << " | " - s << "%-17s" % ffei.workflow_definition_name[0, 17] - s << " | " - s << "%-7s" % ffei.workflow_definition_revision[0, 7] - s << " | " - s << "%3s" % status.expressions.size.to_s[0, 3] - s << " | " - s << "%3s" % status.errors.size.to_s[0, 3] - s << " | " - s << "%5s" % status.paused?.to_s - s << "\n" - end - s end end