lib/openwfe/engine/engine.rb in openwferu-0.9.15 vs lib/openwfe/engine/engine.rb in openwferu-0.9.16

- old
+ new

@@ -84,50 +84,50 @@ $OWFE_LOG = application_context[:logger] unless $OWFE_LOG #puts "Creating logs in " + FileUtils.pwd FileUtils.mkdir("logs") unless File.exist?("logs") - $OWFE_LOG = Logger.new("logs/openwferu.log", 10, 1024000) + $OWFE_LOG = Logger.new "logs/openwferu.log", 10, 1024000 $OWFE_LOG.level = Logger::INFO end # build order matters. # # especially for the expstorage which 'observes' the expression # pool and thus needs to be instantiated after it. - build_scheduler() + build_scheduler # # for delayed or repetitive executions (it's the engine's clock) # see http://openwferu.rubyforge.org/scheduler.html - build_expression_map() + build_expression_map # # mapping expression names ('sequence', 'if', 'concurrence', # 'when'...) to their implementations (SequenceExpression, # IfExpression, ConcurrenceExpression, ...) - build_wfid_generator() + build_wfid_generator # # the workflow instance (process instance) id generator # making sure each process instance has a unique identifier - build_expression_pool() + build_expression_pool # # the core (hairy ball) of the engine - build_expression_storage() + build_expression_storage # # the engine persistence (persisting the expression instances # that make up process instances) - build_participant_map() + build_participant_map # # building the services that maps participant names to # participant implementations / instances. - build_error_journal() + build_error_journal # # builds the error journal (keeping track of failures # in business process executions, and an opportunity to # fix and replay) @@ -210,26 +210,54 @@ # it got sent to a worklist or wherever by a participant). # Participant implementations themselves do call this method usually. # # This method also accepts LaunchItem instances. # + # Since OpenWFEru 0.9.16, this reply method accepts InFlowWorkitem + # that don't belong to a process instance (ie whose flow_expression_id + # is nil). It will simply notify the participant_map of the reply + # for the given participant_name. If there is no participant_name + # specified for this orphan workitem, an exception will be raised. + # def reply (workitem) - if workitem.kind_of?(InFlowWorkItem) + if workitem.is_a?(InFlowWorkItem) - get_expression_pool.reply workitem.flow_expression_id, workitem + if workitem.flow_expression_id + # + # vanilla case, workitem coming back + # (from listener probably) - elsif workitem.kind_of?(LaunchItem) + return get_expression_pool.reply( + workitem.flow_expression_id, workitem) + end - get_expression_pool.launch workitem + if workitem.participant_name + # + # a workitem that doesn't belong to a process instance + # but bears a participant name. + # Notify, there may be something listening on + # this channel (see the 'listen' expression). - else + return get_participant_map.onotify( + workitem.participant_name, :reply, workitem) + end raise \ - "engine.reply() " + - "cannot handle instances of #{workitem.class}" + "InFlowWorkitem doesn't belong to a process instance" + + " nor to a participant" end + + return get_expression_pool.launch(workitem) \ + if workitem.is_a?(LaunchItem) + # + # launchitem coming from listener + # let's attempt to launch a new process instance + + raise \ + "engine.reply() " + + "cannot handle instances of #{workitem.class}" end alias :forward :reply alias :proceed :reply @@ -252,20 +280,20 @@ # of handling workitems for that name. # May be useful in some embedded contexts. # def get_participant (participant_name) - get_participant_map.lookup_participant(participant_name) + get_participant_map.lookup_participant participant_name end # # Removes the first participant matching the given name from the # participant map kept by the engine. # def unregister_participant (participant_name) - get_participant_map.unregister_participant(participant_name) + get_participant_map.unregister_participant participant_name end # # Adds a workitem listener to this engine. # @@ -566,24 +594,68 @@ # # Pauses a process (sets its /__paused__ variable to true). # def pause_process (wfid) - get_expression_pool.pause_process(wfid) + wfid = extract_wfid(wfid) + + root_expression = get_expression_pool.fetch_root(wfid) + + root_expression.set_variable(VAR_PAUSED, true) end # # Restarts a process : removes its 'paused' flag (variable) and makes # sure to 'replay' events (replies) that came for it while it was # in pause. # def resume_process (wfid) - get_expression_pool.resume_process(wfid) + wfid = extract_wfid wfid + + root_expression = get_expression_pool.fetch_root wfid + + # + # remove 'paused' flag + + root_expression.unset_variable(VAR_PAUSED) + + # + # replay + # + # select PausedError instances in separate list + + errors = get_error_journal.get_error_log wfid + error_class = PausedError.name + paused_errors = errors.select { |e| e.error_class == error_class } + + return if paused_errors.size < 1 + + # replay select PausedError instances + + paused_errors.each do |e| + replay_at_error e + end end # + # Takes care of removing an error from the error journal and + # they replays its process at that point. + # + def replay_at_error (error) + + get_error_journal.remove_errors( + error.fei.parent_wfid, + error) + + get_expression_pool.queue_work( + error.message, + error.fei, + error.workitem) + end + + # # Looks up a process variable in a process. # If fei_or_wfid is not given, will simply look in the # 'engine environment' (where the top level variables '//' do reside). # def lookup_variable (var_name, fei_or_wfid=nil) @@ -603,10 +675,50 @@ raise "no expression found for '#{fei_or_wfid.to_s}'" unless exp exp.lookup_variable var_name end + # + # Returns an array of wfid (workflow instance ids) whose root + # environment containes the given variable + # + # If there are no matches, an empty array will be returned. + # + # Regular expressions are accepted as values. + # + # If no value is given, all processes with the given variable name + # set will be returned. + # + def lookup_processes (var_name, value=nil) + + # TODO : maybe this would be better in the ExpressionPool + + result = [] + + regexp = if value + if value.is_a?(Regexp) + value + else + Regexp.compile(value.to_s) + end + else + nil + end + + get_expression_storage.each_of_kind(Environment) do |fei, env| + + val = env.variables[var_name] + + next unless val + next if regexp and (not regexp.match(val)) + + result.push env.fei.wfid + end + + result + end + protected #-- # the following methods may get overridden upon extension # see for example file_persisted_engine.rb @@ -647,11 +759,11 @@ # There is only one implementation of the expression pool, so # this method is usually never overriden. # def build_expression_pool - init_service(S_EXPRESSION_POOL, ExpressionPool) + init_service S_EXPRESSION_POOL, ExpressionPool end # # The implementation here builds an InMemoryExpressionStorage # instance. @@ -659,39 +771,39 @@ # See FilePersistedEngine or CachedFilePersistedEngine for # overrides of this method. # def build_expression_storage - init_service(S_EXPRESSION_STORAGE, InMemoryExpressionStorage) + init_service S_EXPRESSION_STORAGE, InMemoryExpressionStorage end # # The ParticipantMap is a mapping between participant names # (well rather regular expressions) and participant implementations # (see http://openwferu.rubyforge.org/participants.html) # def build_participant_map - init_service(S_PARTICIPANT_MAP, ParticipantMap) + init_service S_PARTICIPANT_MAP, ParticipantMap end # # There is only one Scheduler implementation, that's the one # built and bound here. # def build_scheduler - init_service(S_SCHEDULER, SchedulerService) + init_service S_SCHEDULER, SchedulerService end # # The default implementation of this method uses an # InMemoryErrorJournal (do not use in production). # def build_error_journal - init_service(S_ERROR_JOURNAL, InMemoryErrorJournal) + init_service S_ERROR_JOURNAL, InMemoryErrorJournal end # # Turns the raw launch request info into a LaunchItem instance. # @@ -754,14 +866,20 @@ # are FlowExpressionId instances (fei) (identifying the expressions # that are concerned with the error) # attr_reader :errors + # + # The time at which the process got launched. + # + attr_reader :launch_time + def initialize @wfid = nil @expressions = [] @errors = {} + @launch_time = nil end # # Returns true if the process is in pause. # @@ -804,10 +922,10 @@ def add_expression (fexp) set_wfid fexp.fei.parent_wfid - #@expressions << fexp + @launch_time = fexp.apply_time if fexp.fei.expid == '0' exps = @expressions @expressions = [] added = false