lib/openwfe/expool/expressionpool.rb in openwferu-0.9.3 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.4

- old
+ new

@@ -49,10 +49,11 @@ require 'openwfe/logging' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' require 'openwfe/util/stoppable' require 'openwfe/util/lru_cache' +require 'openwfe/util/observable' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' include OpenWFE @@ -120,11 +121,16 @@ # It's the core of the workflow engine. # It relies on an expression storage for actual persistence of the # expressions. # class ExpressionPool - include ServiceMixin, MonitorMixin, OwfeServiceLocator, Stoppable + include \ + ServiceMixin, + MonitorMixin, + OwfeServiceLocator, + Stoppable, + Observable @@last_given_instance_id = -1 # # storing at class level the last workflow instance id given @@ -134,19 +140,23 @@ service_init(service_name, application_context) @monitors = MonitorProvider.new(application_context) + @observers = {} + reschedule_a_bit_later end # # Makes sure to call the do_stop() method of the Stoppable mixin # def stop # would an alias be better ? do_stop + + onotify :stop end # # Obtains a unique monitor for an expression. # It avoids the need for the FlowExpression instances to include @@ -159,10 +169,12 @@ # # Instantiates a workflow definition and launches it. # def launch (launchitem) + onotify :launch, launchitem.workflow_definition_url + rawExpression = buildRawExpression(launchitem) wi = build_workitem(launchitem) rawExpression.apply(wi) @@ -186,10 +198,12 @@ rawexp = template end ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" } + onotify :launch_template, rawexp.fei + rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression.kind_of? FlowExpressionId rawexp.parent_id = requesting_expression @@ -224,12 +238,16 @@ # # the new scope gets its own environment rawexp.store_itself() + workitem.flow_expression_id = rawexp.fei + rawexp.apply(workitem) + # why not : launch in a thread and reply immediately + return workitem.flow_expression_id end # # Evaluates a raw definition expression and @@ -247,21 +265,23 @@ # def apply (exp, workitem) exp, fei = fetch(exp) - ldebug { "apply() '#{fei}' (#{fei.class})" } + #ldebug { "apply() '#{fei}' (#{fei.class})" } if not exp lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } return end - ldebug { "apply() #{fei.to_debug_s}" } + #ldebug { "apply() #{fei.to_debug_s}" } - workitem.last_expression_id = exp.fei + onotify :apply, fei, workitem + workitem.flow_expression_id = exp.fei + exp.apply(workitem) end # # Cancels the given expression @@ -275,10 +295,12 @@ return nil end ldebug { "cancel() for #{fei.to_debug_s}" } + onotify :cancel, fei + inflowitem = exp.cancel() remove(exp) return inflowitem end @@ -291,10 +313,12 @@ exp, fei = fetch(exp) return if not exp + onotify :forget, fei + exp.parent_id = GONE_PARENT_ID exp.store_itself() end # @@ -304,10 +328,12 @@ exp, fei = fetch(exp) workitem.last_expression_id = fei + onotify :reply_to_parent, fei, workitem + #remove(exp, workitem) remove(exp) # # remove all the children of the expression @@ -345,18 +371,22 @@ #raise "cannot reply to missing #{fei.to_debug_s}" lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } return end + onotify :reply, fei, workitem + exp.reply(workitem) end # # Adds or updates a flow expression in this pool # def update (flowExpression) + onotify :update, flowExpression.fei, flowExpression + get_expression_storage()[flowExpression.fei] = flowExpression end # # Fetches a FlowExpression from the pool. @@ -420,10 +450,12 @@ return if not exp ldebug { "remove() fe #{fei.to_debug_s}" } + onotify :remove, fei + synchronize do @monitors.delete(fei) #get_expression_storage().remove(fei, workitem) @@ -460,10 +492,12 @@ get_expression_storage.each_of_kind(Schedulable) do |fe| ldebug { "reschedule() for #{fe.fei.to_debug_s}..." } + onotify :reschedule, fe.fei + fe.reschedule(get_scheduler) end ldebug { "reschedule() done." } end @@ -476,11 +510,11 @@ # def engine_environment_id () synchronize do return @eei if @eei @eei = FlowExpressionId.new - @eei.owfe_version = OPENWFE_VERSION + @eei.owfe_version = OPENWFERU_VERSION @eei.engine_id = get_engine.service_name @eei.initial_engine_id = @eei.engine_id @eei.workflow_definition_url = 'ee' @eei.workflow_definition_name = 'ee' @eei.workflow_definition_revision = '0' @@ -515,12 +549,17 @@ def evaluate_definition (raw_definition, workitem) expression = raw_definition.instantiate(workitem) end def remove_environment (environment_id) + + ldebug { "remove_environment() #{environment_id.to_debug_s}" } + env, fei = fetch(environment_id) + env.unbind() + get_expression_storage().delete(environment_id) end def build_workitem (launchitem) @@ -584,10 +623,10 @@ def new_fei (flow_url, flow_name, flow_revision, exp_name) fei = FlowExpressionId.new - fei.owfe_version = OPENWFE_VERSION + fei.owfe_version = OPENWFERU_VERSION fei.engine_id = get_engine.service_name fei.initial_engine_id = fei.engine_id fei.workflow_definition_url = flow_url fei.workflow_definition_name = flow_name fei.workflow_definition_revision = flow_revision