lib/openwfe/expool/expressionpool.rb in openwferu-0.9.2 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.3

- old
+ new

@@ -47,10 +47,11 @@ require 'openwfe/utils' require 'openwfe/service' require 'openwfe/logging' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' +require 'openwfe/util/stoppable' require 'openwfe/util/lru_cache' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' include OpenWFE @@ -118,23 +119,37 @@ # The ExpressionPool stores expressions (pieces of workflow instance). # It's the core of the workflow engine. # It relies on an expression storage for actual persistence of the # expressions. # - class ExpressionPool < Service - include MonitorMixin, OwfeServiceLocator + class ExpressionPool + include ServiceMixin, MonitorMixin, OwfeServiceLocator, Stoppable @@last_given_instance_id = -1 # # storing at class level the last workflow instance id given - def initialize (serviceName, applicationContext) - super(serviceName, applicationContext) + def initialize (service_name, application_context) + + super() + + service_init(service_name, application_context) + @monitors = MonitorProvider.new(application_context) + + reschedule_a_bit_later end # + # Makes sure to call the do_stop() method of the Stoppable mixin + # + def stop + # would an alias be better ? + do_stop + end + + # # Obtains a unique monitor for an expression. # It avoids the need for the FlowExpression instances to include # the monitor mixin by themselves # def get_monitor (fei) @@ -149,21 +164,32 @@ rawExpression = buildRawExpression(launchitem) wi = build_workitem(launchitem) rawExpression.apply(wi) + + return wi.flow_expression_id end # # launches a subprocess # def launch_template \ - (requesting_expression, template_fei, workitem, params=nil) + (requesting_expression, template, workitem, params=nil) - ldebug { "launch() request for #{template_fei.to_debug_s}" } + #ldebug { "launch_template() of class #{template.class}" } - rawexp, fei = fetch(template_fei) + rawexp = nil + + if template.kind_of? FlowExpressionId + rawexp, fei = fetch(template) + else # template is of kind RawExpression + rawexp = template + end + + ldebug { "launch_template() request for #{rawexp.fei.to_debug_s}" } + rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression.kind_of? FlowExpressionId rawexp.parent_id = requesting_expression @@ -176,10 +202,18 @@ else # kind is FlowExpression rawexp.parent_id = requesting_expression.fei rawexp.fei.workflow_instance_id = \ "#{requesting_expression.fei.workflow_instance_id}.0" end + + #ldebug do + # p = "" + # p = rawexp.parent_id.to_debug_s if rawexp.parent_id + # "launch_template()\n"+ + # " rawexp.fei is #{rawexp.fei.to_debug_s}\n"+ + # " rawexp.parent_id is #{p}" + #end #ldebug do # "launch_template() spawning wfid " + # "#{rawexp.fei.workflow_instance_id.to_s}" #end @@ -191,10 +225,12 @@ # the new scope gets its own environment rawexp.store_itself() rawexp.apply(workitem) + + return workitem.flow_expression_id end # # Evaluates a raw definition expression and # returns its body fei @@ -268,12 +304,18 @@ exp, fei = fetch(exp) workitem.last_expression_id = fei - remove(exp, workitem) + #remove(exp, workitem) + remove(exp) + # + # remove all the children of the expression + + exp.clean_children() + if not exp.parent_id ldebug do "reply_to_parent() process " + "#{exp.fei.workflow_instance_id} terminated" end @@ -323,18 +365,22 @@ # The param 'exp' may be a FlowExpressionId or a FlowExpression that # has to be reloaded. # def fetch (exp) synchronize do + fei = exp if exp.kind_of? FlowExpression fei = exp.fei elsif not exp.kind_of? FlowExpressionId raise \ "Cannot fetch expression with key : "+ "'#{fei}' (#{fei.class})" end + + ldebug { "fetch() for #{fei.to_debug_s}" } + return get_expression_storage()[fei], fei end end # @@ -358,21 +404,19 @@ ee = Environment\ .new(eei, nil, nil, @application_context, nil) ee.store_itself() end - ldebug { "fetch_engine_environment() stored new ee" } - return ee end end # # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # - def remove (exp, workitem=nil) + def remove (exp) exp, fei = fetch(exp) return if not exp @@ -380,20 +424,58 @@ synchronize do @monitors.delete(fei) - #get_expression_storage().delete(fei) - get_expression_storage().remove(fei, workitem) + #get_expression_storage().remove(fei, workitem) + get_expression_storage().delete(fei) if exp.owns_its_environment? remove_environment(exp.environment_id) end end end + # + # This method is called at each expool (engine) [re]start. + # It roams through the previously saved (persisted) expressions + # to reschedule ones like 'sleep' or 'cron'. + # + def reschedule + synchronize do + + #if is_stopped? + # linfo { "reschedule() skipped as expool is stopped" } + # return + #end + #if get_scheduler.is_stopped? + # linfo do + # "reschedule() skipped as scheduler "+ + # "#{get_scheduler.object_id} is stopped" + # end + # return + #end + + ldebug { "reschedule() initiating..." } + + get_expression_storage.each_of_kind(Schedulable) do |fe| + + ldebug { "reschedule() for #{fe.fei.to_debug_s}..." } + + fe.reschedule(get_scheduler) + end + + ldebug { "reschedule() done." } + end + end + + # + # Returns the unique engine_environment FlowExpressionId instance. + # There is only one such environment in an engine, hence this + # 'singleton' method. + # def engine_environment_id () synchronize do return @eei if @eei @eei = FlowExpressionId.new @eei.owfe_version = OPENWFE_VERSION @@ -409,18 +491,37 @@ end end protected + def reschedule_a_bit_later + Thread.new do + # + # Just leaving some time for the initialize() to finish + # and let the expression pool get registered in + # the application context + # + begin + sleep(0.001) + reschedule() + rescue + lwarn do + "reschedule() failed\n"+ + OpenWFE::exception_to_s($!) + end + end + end + end + def evaluate_definition (raw_definition, workitem) expression = raw_definition.instantiate(workitem) end def remove_environment (environment_id) env, fei = fetch(environment_id) env.unbind() - get_expression_storage().remove(environment_id, nil) + get_expression_storage().delete(environment_id) end def build_workitem (launchitem) wi = InFlowWorkItem.new() @@ -496,9 +597,10 @@ fei.expression_name = exp_name return fei end def new_workflow_instance_id () + synchronize do wfid = OpenWFE::current_time_millis() wfid = wfid + 1 if wfid == @@last_given_instance_id @@last_given_instance_id = wfid return wfid.to_s