lib/openwfe/expool/expressionpool.rb in openwferu-0.9.13 vs lib/openwfe/expool/expressionpool.rb in openwferu-0.9.14

- old
+ new

@@ -88,15 +88,16 @@ # expressions. # class ExpressionPool include ServiceMixin include OwfeServiceLocator - include Observable + include OwfeObservable include WorkqueueMixin include FeiMixin include MonitorMixin + # # code loaded from a remote URI will get evaluated with # that security level # SAFETY_LEVEL = 2 @@ -111,10 +112,13 @@ @observers = {} @stopped = false + engine_environment_id + # makes sure it's called now + start_workqueue end # # Stops this expression pool (especially its workqueue). @@ -135,11 +139,11 @@ # It avoids the need for the FlowExpression instances to include # the monitor mixin by themselves # def get_monitor (fei) - return @monitors[fei] + @monitors[fei] end # # This method is called by the launch method. It's actually the first # stage of that method. @@ -184,12 +188,15 @@ # before the actual launch is completely over. # # Returns the FlowExpressionId instance of the root expression of # the newly launched flow. # - def launch (launchitem) + def launch (launchitem, options) + # + # prepare raw expression + raw_expression = prepare_raw_expression launchitem # # will raise an exception if there are requirements # and one of them is not met @@ -197,14 +204,20 @@ # # as this expression is the root of a new process instance, # it has to have an environment for all the variables of # the process instance - wi = build_workitem launchitem + raw_expression = wrap_in_schedule(raw_expression, options) \ + if options and options.size > 0 fei = raw_expression.fei + # + # apply prepared raw expression + + wi = build_workitem launchitem + onotify :launch, fei, launchitem apply raw_expression, wi fei @@ -218,36 +231,36 @@ # and of course used by the launch_template() method. # def prepare_from_template ( requesting_expression, sub_id, template, params=nil) - rawexp = if template.is_a? RawExpression + rawexp = if template.is_a?(RawExpression) template - elsif template.is_a? FlowExpressionId + elsif template.is_a?(FlowExpressionId) fetch_expression(template) else build_raw_expression(nil, template) end - #raise "did not find expression at #{template.to_s}" \ + #raise "did not find subprocess in : #{template.to_s}" \ # unless rawexp rawexp = rawexp.dup() rawexp.fei = rawexp.fei.dup() if requesting_expression == nil rawexp.parent_id = nil rawexp.fei.workflow_instance_id = get_wfid_generator.generate - elsif requesting_expression.kind_of? FlowExpressionId + elsif requesting_expression.kind_of?(FlowExpressionId) rawexp.parent_id = requesting_expression rawexp.fei.workflow_instance_id = \ "#{requesting_expression.workflow_instance_id}.#{sub_id}" - elsif requesting_expression.kind_of? String + elsif requesting_expression.kind_of?(String) rawexp.parent_id = nil rawexp.fei.workflow_instance_id = \ "#{requesting_expression}.#{sub_id}" @@ -310,20 +323,18 @@ # # Applies a given expression (id or expression) # def apply (exp, workitem) - #do_apply exp, workitem queue_work :do_apply, exp, workitem end # # Replies to a given expression # def reply (exp, workitem) - #do_reply exp, workitem queue_work :do_reply, exp, workitem end # # Cancels the given expression. @@ -406,10 +417,62 @@ ldebug { "forget() forgot #{fei}" } end # + # Pauses a process (sets its '/__paused__' variable to true). + # + def pause_process (wfid) + + wfid = extract_wfid(wfid) + + root_expression = fetch_root(wfid) + + root_expression.set_variable(VAR_PAUSED, true) + end + + # + # Restarts a process : removes its 'paused' flag (variable) and makes + # sure to 'replay' events (replies) that came for it while it was + # in pause. + # + def resume_process (wfid) + + wfid = extract_wfid(wfid) + + root_expression = fetch_root(wfid) + + # + # remove 'paused' flag + + root_expression.unset_variable(VAR_PAUSED) + + # + # replay + + journal = get_error_journal + + # select PausedError instances in separate list + + errors = journal.get_error_log(wfid) + error_class = PausedError.name + paused_errors = errors.select { |e| e.error_class == error_class } + + return if paused_errors.size < 1 + + # remove them from current error journal + + journal.remove_errors wfid, paused_errors + + # replay select PausedError instances + + paused_errors.each do |e| + journal.replay_at_error e + end + end + + # # Replies to the parent of the given expression. # def reply_to_parent (exp, workitem, remove=true) ldebug { "reply_to_parent() for #{exp.fei.to_debug_s}" } @@ -550,10 +613,12 @@ # # Returns the engine environment (the top level environment) # def fetch_engine_environment () synchronize do + # + # synchronize to ensure that there's 1! engine env eei = engine_environment_id ee, fei = fetch(eei) if not ee @@ -573,11 +638,11 @@ def remove (exp) exp, _fei = fetch(exp) \ if exp.kind_of?(FlowExpressionId) - return if not exp + return unless exp ldebug { "remove() fe #{exp.fei.to_debug_s}" } onotify :remove, exp.fei @@ -601,47 +666,48 @@ synchronize do t = OpenWFE::Timer.new - ldebug { "reschedule() initiating..." } + linfo { "reschedule() initiating..." } get_expression_storage.each_of_kind(Schedulable) do |fe| - ldebug { "reschedule() for #{fe.fei.to_debug_s}..." } + #linfo { "reschedule() for #{fe.fei.to_debug_s}..." } + linfo { "reschedule() for #{fe.fei.to_s}..." } onotify :reschedule, fe.fei fe.reschedule(get_scheduler) end - ldebug { "reschedule() done. (took #{t.duration} ms)" } + linfo { "reschedule() done. (took #{t.duration} ms)" } end end # # Returns the unique engine_environment FlowExpressionId instance. # There is only one such environment in an engine, hence this # 'singleton' method. # def engine_environment_id () - synchronize do + #synchronize do - return @eei if @eei + return @eei if @eei - @eei = FlowExpressionId.new - @eei.owfe_version = OPENWFERU_VERSION - @eei.engine_id = get_engine.service_name - @eei.initial_engine_id = @eei.engine_id - @eei.workflow_definition_url = 'ee' - @eei.workflow_definition_name = 'ee' - @eei.workflow_definition_revision = '0' - @eei.workflow_instance_id = '0' - @eei.expression_name = EN_ENVIRONMENT - @eei.expression_id = '0' - @eei - end + @eei = FlowExpressionId.new + @eei.owfe_version = OPENWFERU_VERSION + @eei.engine_id = get_engine.service_name + @eei.initial_engine_id = @eei.engine_id + @eei.workflow_definition_url = 'ee' + @eei.workflow_definition_name = 'ee' + @eei.workflow_definition_revision = '0' + @eei.workflow_instance_id = '0' + @eei.expression_name = EN_ENVIRONMENT + @eei.expression_id = '0' + @eei + #end end # # Returns the list of applied expressions belonging to a given # workflow instance. @@ -714,51 +780,91 @@ def fetch_expression_with_wfid (wfid) list_processes(false, wfid)[0] end + # + # This method is called when apply() or reply() failed for + # an expression. + # There are currently only two 'users', the ParticipantExpression + # class and the do_process_workelement method of this ExpressionPool + # class. + # + def notify_error (error, fei, message, workitem) + + fei = extract_fei fei + # densha requires that... :( + + se = OpenWFE::exception_to_s(error) + + onotify :error, fei, message, workitem, error.class.name, se + + #fei = extract_fei fei + + if error.is_a?(PausedError) + lwarn do + "#{self.service_name} " + + "operation :#{message.to_s} on #{fei.to_s} " + + "delayed because process '#{fei.wfid}' is in pause" + end + else + lwarn do + "#{self.service_name} " + + "operation :#{message.to_s} on #{fei.to_s} " + + "failed with\n" + se + end + end + end + protected + #-- + # Returns true if it's the fei of a participant + # (or of a subprocess ref) # + #def is_participant? (fei) + # exp_name = fei.expression_name + # return true if exp_name == "participant" + # (get_expression_map.get_class(exp_name) == nil) + #end + #++ + + # # This method is called by the workqueue when processing # the atomic work operations. # def do_process_workelement elt + message, fei, workitem = elt + begin - message, fei, workitem = elt send message, fei, workitem rescue Exception => e - se = OpenWFE::exception_to_s(e) - - onotify :error, fei, message, workitem, se - - fei = extract_fei fei - - lwarn do - "#{self.service_name} " + - "operation :#{message.to_s} on #{fei.to_s} " + - "failed with\n" + se - end + notify_error(e, fei, message, workitem) end end # # The real apply work. # def do_apply (exp, workitem) - exp, fei = fetch(exp) if exp.kind_of? FlowExpressionId + exp, fei = fetch(exp) if exp.kind_of?(FlowExpressionId) + check_if_paused exp + #ldebug { "apply() '#{fei}' (#{fei.class})" } if not exp + lwarn { "apply() cannot apply missing #{fei.to_debug_s}" } return + + #raise "apply() cannot apply missing #{fei.to_debug_s}" end #ldebug { "apply() #{fei.to_debug_s}" } #exp.apply_time = OpenWFE::now() @@ -780,10 +886,12 @@ exp, fei = fetch(exp) ldebug { "reply() to #{fei.to_debug_s}" } ldebug { "reply() from #{workitem.last_expression_id.to_debug_s}" } + check_if_paused exp + if not exp #raise "cannot reply to missing #{fei.to_debug_s}" lwarn { "reply() cannot reply to missing #{fei.to_debug_s}" } return end @@ -792,19 +900,108 @@ exp.reply(workitem) end # + # Will raise an exception if the expression belongs to a paused + # process. + # + def check_if_paused (expression) + + return unless expression + + raise PausedError.new(expression.fei.wfid) \ + if expression.paused? + end + + # + # if the launch method is called with a schedule option + # (like :at, :in, :cron and :every), this method takes care of + # wrapping the process with a sleep or a cron. + # + def wrap_in_schedule (raw_expression, options) + + oat = options[:at] + oin = options[:in] + ocron = options[:cron] + oevery = options[:every] + + fei = new_fei(nil, "schedlaunch", "0", "sequence") + + # not very happy with this code, it builds custom + # wrapping processes manually, maybe there is + # a more elegant way, but for now, it's ok. + + if oat or oin + + seq = get_expression_map.get_class(:sequence) + seq = seq.new(fei, nil, nil, application_context, nil) + + att = if oat + { "until" => oat } + else #oin + { "for" => oin } + end + + sle = get_expression_map.get_class(:sleep) + sle = sle.new(fei.dup, fei, nil, application_context, att) + sle.fei.expression_id = "0.1" + sle.fei.expression_name = "sleep" + seq.children << sle.fei + seq.children << raw_expression.fei + + seq.new_environment + sle.environment_id = seq.environment_id + + sle.store_itself + seq.store_itself + + raw_expression.store_itself + raw_expression = seq + + elsif ocron or oevery + + fei.expression_name = "cron" + + att = if ocron + { "tab" => ocron } + else #oevery + { "every" => oevery } + end + att["name"] = "//cron_launch__#{fei.wfid}" + + cro = get_expression_map.get_class(:cron) + cro = cro.new(fei, nil, nil, application_context, att) + + cro.children << raw_expression.fei + + cro.new_environment + + cro.store_itself + + raw_expression.store_itself + raw_expression = cro + end + # else, don't schedule at all + + raw_expression + end + + # # Removes an environment, especially takes care of unbinding # any special value it may contain. # def remove_environment (environment_id) ldebug { "remove_environment() #{environment_id.to_debug_s}" } env, fei = fetch(environment_id) + return unless env + # + # env already unbound and removed + env.unbind #get_expression_storage().delete(environment_id) onotify :remove, environment_id @@ -889,20 +1086,16 @@ # something that was dumped via YAML # # else it's some ruby code to eval - o = OpenWFE::eval_safely(param, SAFETY_LEVEL) - - return o.do_make \ - if o.is_a?(ProcessDefinition) or o.is_a?(Class) - - o + ProcessDefinition::eval_ruby_process_definition( + param, SAFETY_LEVEL) end # - # Builds a FlowExpressionId instance for process being + # Builds a FlowExpressionId instance for a process being # launched. # def new_fei (launchitem, flow_name, flow_revision, exp_name) url = if launchitem @@ -952,10 +1145,45 @@ fei, nil, nil, @application_context, procdef) end end # + # This error is raised when an expression belonging to a paused + # process is applied or replied to. + # + class PausedError < RuntimeError + + attr_reader :wfid + + def initialize (wfid) + + super "process '#{wfid}' is paused" + @wfid = wfid + end + + # + # Returns a hash for this PausedError instance. + # (simply returns the hash of the paused process' wfid). + # + def hash + + @wfid.hash + end + + # + # Returns true if the other is a PausedError issued for the + # same process instance (wfid). + # + def == (other) + + return false unless other.is_a?(PausedError) + + (@wfid == other.wfid) + end + end + + # # a small help class for storing monitors provided on demand # to expressions that need them # class MonitorProvider include MonitorMixin, Logging @@ -970,18 +1198,9 @@ def [] (key) synchronize do (@monitors[key] ||= Monitor.new) - #if not mon - # #ldebug { "[] creating new Monitor for #{key}" } - # mon = Monitor.new - # @monitors[key] = mon - #else - # #ldebug { "[] already had Monitor for #{key}" } - #end - # - #mon end end def delete (key) synchronize do