lib/openwfe/expool/expressionpool.rb in openwferu-0.9.4 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.5

- old
+ new

@@ -47,12 +47,11 @@ require 'openwfe/utils' require 'openwfe/service' require 'openwfe/logging' require 'openwfe/rudefinitions' require 'openwfe/flowexpressionid' -require 'openwfe/util/stoppable' -require 'openwfe/util/lru_cache' +require 'openwfe/util/lru' require 'openwfe/util/observable' require 'openwfe/expressions/environment' require 'openwfe/expressions/raw_xml' include OpenWFE @@ -70,11 +69,11 @@ MAX_MONITORS = 10000 def initialize (application_context=nil) super() @application_context = application_context - @monitors = LRUCache.new(MAX_MONITORS) + @monitors = LruHash.new(MAX_MONITORS) end def [] (key) synchronize do #ldebug { "[] caller :\n" + OpenWFE::caller_to_s(8) } @@ -125,11 +124,10 @@ class ExpressionPool include \ ServiceMixin, MonitorMixin, OwfeServiceLocator, - Stoppable, Observable @@last_given_instance_id = -1 # # storing at class level the last workflow instance id given @@ -142,20 +140,18 @@ @monitors = MonitorProvider.new(application_context) @observers = {} - reschedule_a_bit_later + @stopped = false end # # Makes sure to call the do_stop() method of the Stoppable mixin # def stop - # would an alias be better ? - do_stop - + @stopped = true onotify :stop end # # Obtains a unique monitor for an expression. @@ -167,28 +163,46 @@ end # # Instantiates a workflow definition and launches it. # - def launch (launchitem) + # If async is set to true, the launch will occur in its own thread. + # + # Returns the FlowExpressionId instance of the root expression of + # the newly launched flow. + # If async is set to true, returns a tuple FlowExpressionId / + # Thread instance used. + # + def launch (launchitem, async=false) onotify :launch, launchitem.workflow_definition_url - rawExpression = buildRawExpression(launchitem) + raw_expression = buildRawExpression(launchitem) wi = build_workitem(launchitem) - rawExpression.apply(wi) + fei = raw_expression.fei - return wi.flow_expression_id + if async + + t = OpenWFE::call_in_thread "launch()", self do + raw_expression.apply(wi) + end + + return fei, t + end + + raw_expression.apply(wi) + + return fei end # # launches a subprocess # - def launch_template \ - (requesting_expression, template, workitem, params=nil) + def launch_template ( + requesting_expression, sub_id, template, workitem, params=nil) #ldebug { "launch_template() of class #{template.class}" } rawexp = nil @@ -203,22 +217,32 @@ onotify :launch_template, rawexp.fei rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() - if requesting_expression.kind_of? FlowExpressionId + if requesting_expression == nil + + rawexp.parent_id = nil + rawexp.fei.workflow_instance_id = new_workflow_instance_id() + + elsif requesting_expression.kind_of? FlowExpressionId + rawexp.parent_id = requesting_expression rawexp.fei.workflow_instance_id = \ - "#{requesting_expression.workflow_instance_id}.0" + "#{requesting_expression.workflow_instance_id}.#{sub_id}" + elsif requesting_expression.kind_of? String + rawexp.parent_id = nil rawexp.fei.workflow_instance_id = \ - "#{requesting_expression}.0" + "#{requesting_expression}.${sub_id}" + else # kind is FlowExpression + rawexp.parent_id = requesting_expression.fei rawexp.fei.workflow_instance_id = \ - "#{requesting_expression.fei.workflow_instance_id}.0" + "#{requesting_expression.fei.workflow_instance_id}.#{sub_id}" end #ldebug do # p = "" # p = rawexp.parent_id.to_debug_s if rawexp.parent_id @@ -240,25 +264,31 @@ rawexp.store_itself() workitem.flow_expression_id = rawexp.fei + fei = rawexp.fei + rawexp.apply(workitem) - # why not : launch in a thread and reply immediately - - return workitem.flow_expression_id + return fei end # # Evaluates a raw definition expression and # returns its body fei # def evaluate (rawExpression, workitem) + exp = rawExpression.instantiate_real_expression(workitem) fei = exp.evaluate(workitem) - remove(rawExpression) + + #remove(rawExpression) + # + # not necessary, the raw expression gets overriden by + # the real expression + return fei end # # Applies a given expression (id or expression) @@ -304,10 +334,23 @@ return inflowitem end # + # Given any expression of a process, cancels the complete process. + # + def cancel_flow (exp) + + ldebug { "cancel_flow() from #{exp}" } + + root = fetch_root(exp) + cancel(root) + end + + alias :cancel_process :cancel_flow + + # # Forgets the given expression (makes sure to substitute its # parent_id with the GONE_PARENT_ID constant) # def forget (exp) @@ -324,17 +367,16 @@ # # Replies to the parent of the given expression. # def reply_to_parent (exp, workitem) - exp, fei = fetch(exp) + ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } - workitem.last_expression_id = fei + workitem.last_expression_id = exp.fei - onotify :reply_to_parent, fei, workitem + onotify :reply_to_parent, exp.fei, workitem - #remove(exp, workitem) remove(exp) # # remove all the children of the expression @@ -381,13 +423,19 @@ # # Adds or updates a flow expression in this pool # def update (flowExpression) + t = Timer.new + onotify :update, flowExpression.fei, flowExpression - get_expression_storage()[flowExpression.fei] = flowExpression + #get_expression_storage()[flowExpression.fei] = flowExpression + + ldebug { "update() took #{t.duration} ms" } + + return flowExpression end # # Fetches a FlowExpression from the pool. # Returns a tuple : the FlowExpression plus its FlowExpressionId. @@ -397,10 +445,13 @@ # def fetch (exp) synchronize do fei = exp + + #ldebug { "fetch() exp is of kind #{exp.class}" } + if exp.kind_of? FlowExpression fei = exp.fei elsif not exp.kind_of? FlowExpressionId raise \ "Cannot fetch expression with key : "+ @@ -422,10 +473,23 @@ def fetch_expression (exp) exp, _fei = fetch(exp) return exp end + # + # Fetches the root expression of a process (given any of its + # expressions). + # + def fetch_root (exp) + exp = fetch_expression(exp) + return exp unless exp.parent_id + return fetch_root(fetch_expression(exp.parent_id)) + end + + # + # Returns the engine environment (the top level environment) + # def fetch_engine_environment () synchronize do eei = engine_environment_id ee, fei = fetch(eei) @@ -444,26 +508,30 @@ # Removes a flow expression from the pool # (This method is mainly called from the pool itself) # def remove (exp) - exp, fei = fetch(exp) + #exp, fei = fetch(exp) + if exp.kind_of? FlowExpressionId + exp, _fei = fetch(exp) + end + return if not exp - ldebug { "remove() fe #{fei.to_debug_s}" } + ldebug { "remove() fe #{exp.fei.to_debug_s}" } - onotify :remove, fei + onotify :remove, exp.fei synchronize do - @monitors.delete(fei) + @monitors.delete(exp.fei) - #get_expression_storage().remove(fei, workitem) - get_expression_storage().delete(fei) + #get_expression_storage().delete(fei) if exp.owns_its_environment? + remove_environment(exp.environment_id) end end end @@ -472,23 +540,16 @@ # This method is called at each expool (engine) [re]start. # It roams through the previously saved (persisted) expressions # to reschedule ones like 'sleep' or 'cron'. # def reschedule + + return if @stopped + synchronize do - #if is_stopped? - # linfo { "reschedule() skipped as expool is stopped" } - # return - #end - #if get_scheduler.is_stopped? - # linfo do - # "reschedule() skipped as scheduler "+ - # "#{get_scheduler.object_id} is stopped" - # end - # return - #end + t = OpenWFE::Timer.new ldebug { "reschedule() initiating..." } get_expression_storage.each_of_kind(Schedulable) do |fe| @@ -497,11 +558,11 @@ onotify :reschedule, fe.fei fe.reschedule(get_scheduler) end - ldebug { "reschedule() done." } + ldebug { "reschedule() done. (took #{t.duration} ms)" } end end # # Returns the unique engine_environment FlowExpressionId instance. @@ -525,29 +586,10 @@ end end protected - def reschedule_a_bit_later - Thread.new do - # - # Just leaving some time for the initialize() to finish - # and let the expression pool get registered in - # the application context - # - begin - sleep(0.001) - reschedule() - rescue - lwarn do - "reschedule() failed\n"+ - OpenWFE::exception_to_s($!) - end - end - end - end - def evaluate_definition (raw_definition, workitem) expression = raw_definition.instantiate(workitem) end def remove_environment (environment_id) @@ -556,11 +598,13 @@ env, fei = fetch(environment_id) env.unbind() - get_expression_storage().delete(environment_id) + #get_expression_storage().delete(environment_id) + + onotify :remove, environment_id end def build_workitem (launchitem) wi = InFlowWorkItem.new() @@ -611,11 +655,11 @@ return sDefinition.make() end if sDefinition.kind_of? Class - return sDefinition.do_make(get_expression_map) + return sDefinition.do_make() end raise \ "Cannot deduce process definition " + "out of instance of class #{sDefinition.class}" @@ -638,12 +682,17 @@ end def new_workflow_instance_id () synchronize do + wfid = OpenWFE::current_time_millis() - wfid = wfid + 1 if wfid == @@last_given_instance_id + + wfid = @@last_given_instance_id + 1 \ + if wfid <= @@last_given_instance_id + @@last_given_instance_id = wfid + return wfid.to_s end end #