lib/ruote/exp/flowexpression.rb in ruote-2.1.11 vs lib/ruote/exp/flowexpression.rb in ruote-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -58,10 +58,11 @@ include Ruote::WithMeta require 'ruote/exp/ro_persist' require 'ruote/exp/ro_attributes' require 'ruote/exp/ro_variables' + require 'ruote/exp/ro_filters' COMMON_ATT_KEYS = %w[ if unless forget timeout on_error on_cancel on_timeout ] attr_reader :context @@ -77,15 +78,16 @@ h_reader :on_error h_reader :on_cancel h_reader :on_timeout - def initialize (context, h) + def initialize(context, h) @context = context @msg = nil + # contains generally the msg the expression got instantiated for self.h = h h._id ||= Ruote.to_storage_id(h.fei) h['type'] ||= 'expressions' @@ -97,26 +99,37 @@ h.on_cancel ||= attribute(:on_cancel) h.on_error ||= attribute(:on_error) h.on_timeout ||= attribute(:on_timeout) end - def h= (hash) + def h=(hash) @h = hash class << h include Ruote::HashDot end end + # Returns the Ruote::FlowExpressionId for this expression. + # def fei + Ruote::FlowExpressionId.new(h.fei) end + # Returns the Ruote::FlowExpressionIf of the parent expression, or nil + # if there is no parent expression. + # def parent_id + h.parent_id ? Ruote::FlowExpressionId.new(h.parent_id) : nil end + # Fetches the parent expression, or returns nil if there is no parent + # expression. + # def parent + Ruote::Exp::FlowExpression.fetch(@context, h.parent_id) end # Turns this FlowExpression instance into a Hash (well, just hands back # the base hash behind it). @@ -126,20 +139,20 @@ @h end # Instantiates expression back from hash. # - def self.from_h (context, h) + def self.from_h(context, h) exp_class = context.expmap.expression_class(h['name']) exp_class.new(context, h) end # Fetches an expression from the storage and readies it for service. # - def self.fetch (context, fei) + def self.fetch(context, fei) return nil if fei.nil? fexp = context.storage.get('expressions', Ruote.to_storage_id(fei)) @@ -150,25 +163,29 @@ # META #++ # Keeping track of names and aliases for the expression # - def self.names (*exp_names) + def self.names(*exp_names) exp_names = exp_names.collect { |n| n.to_s } meta_def(:expression_names) { exp_names } end #-- # apply/reply #++ - def self.do_action (context, msg) + # Called by the worker when it has something to do for a FlowExpression. + # + def self.do_action(context, msg) fei = msg['fei'] action = msg['action'] + p msg unless fei + if action == 'reply' && fei['engine_id'] != context.engine_id # # the reply has to go to another engine, let's locate the # 'engine participant' and give it the workitem/reply # @@ -197,43 +214,66 @@ # this retry system is only useful with ruote-couch fexp.send("do_#{action}", msg) if fexp end - def do_apply + # Called by the worker when it has just created this FlowExpression and + # wants to apply it. + # + def do_apply(msg) + @msg = Ruote.fulldup(msg) + if not Condition.apply?(attribute(:if), attribute(:unless)) return reply_to_parent(h.applied_workitem) end if attribute(:forget).to_s == 'true' - i = h.parent_id + pi = h.parent_id wi = Ruote.fulldup(h.applied_workitem) h.variables = compile_variables h.parent_id = nil h.forgotten = true - @context.storage.put_msg('reply', 'fei' => i, 'workitem' => wi) + @context.storage.put_msg('reply', 'fei' => pi, 'workitem' => wi) if pi + # reply to parent immediately (if there is a parent) + + elsif attribute(:lose).to_s == 'true' + + h.lost = true end + filter + consider_tag consider_timeout apply end - def reply_to_parent (workitem, delete=true) + # FlowExpression call this method when they're done and they want their + # parent expression to take over (it will end up calling the #reply of + # the parent expression). + # + def reply_to_parent(workitem, delete=true) + filter(workitem) + if h.tagname unset_variable(h.tagname) + Ruote::Workitem.remove_tag(workitem, h.tagname) + @context.storage.put_msg( - 'left_tag', 'tag' => h.tagname, 'fei' => h.fei) + 'left_tag', + 'tag' => h.tagname, + 'fei' => h.fei, + 'workitem' => workitem) end if h.timeout_schedule_id && h.state != 'timing_out' @context.storage.delete_schedule(h.timeout_schedule_id) @@ -241,29 +281,30 @@ if h.state == 'failing' # on_error is implicit (#fail got called) trigger('on_error', workitem) - elsif (h.state == 'cancelling') and h.on_cancel + elsif h.state == 'cancelling' and h.on_cancel trigger('on_cancel', workitem) - elsif (h.state == 'cancelling') and h.on_re_apply + elsif h.state == 'cancelling' and h.on_re_apply trigger('on_re_apply', workitem) - elsif (h.state == 'timing_out') and h.on_timeout + elsif h.state == 'timing_out' and h.on_timeout trigger('on_timeout', workitem) + elsif h.lost and h.state == nil + + # do not reply, sit here (and wait for cancellation probably) + else # vanilla reply - #unpersist_or_raise if delete - #try_unpersist if delete - if delete - do_unpersist || return - end + (do_unpersist || return) if delete + # remove expression from storage if h.parent_id @context.storage.put_msg( 'reply', @@ -279,11 +320,13 @@ 'workitem' => workitem) end end end - def do_reply (msg) + # Wraps #reply (does the administrative part of the reply work). + # + def do_reply(msg) @msg = Ruote.fulldup(msg) # keeping the message, for 'retry' in collision cases workitem = msg['workitem'] @@ -316,20 +359,22 @@ # alias :do_receive :do_reply # A default implementation for all the expressions. # - def reply (workitem) + def reply(workitem) reply_to_parent(workitem) end # The raw handling of messages passed to expressions (the fine handling # is done in the #cancel method). # - def do_cancel (msg) + def do_cancel(msg) + @msg = Ruote.fulldup(msg) + flavour = msg['flavour'] return if h.state == 'cancelling' && flavour != 'kill' # cancel on cancel gets discarded @@ -373,11 +418,11 @@ end # This default implementation cancels all the [registered] children # of this expression. # - def cancel (flavour) + def cancel(flavour) return reply_to_parent(h.applied_workitem) if h.children.empty? # # there are no children, nothing to cancel, let's just reply to # the parent expression @@ -418,12 +463,18 @@ # 'fei' => h.fei, # 'flavour' => flavour) #end end - def do_fail (msg) + # Called when handling an on_error, will place itself in a 'failing' state + # and cancel the children (when the reply from the children comes back, + # the on_reply will get triggered). + # + def do_fail(msg) + @msg = Ruote.fulldup(msg) + @h['state'] = 'failing' @h['applied_workitem'] = msg['workitem'] if h.children.size < 1 reply_to_parent(@h['applied_workitem']) @@ -435,19 +486,22 @@ #-- # misc #++ - def launch_sub (pos, subtree, opts={}) + # Launches a subprocesses (usually called from the #apply of certain + # expression implementations. + # + def launch_sub(pos, subtree, opts={}) - i = h.fei.dup - i['sub_wfid'] = get_next_sub_wfid - i['expid'] = pos + i = h.fei.merge( + 'subid' => Ruote.generate_subid(h.fei.inspect), + 'expid' => pos) #p '=== launch_sub ===' - #p [ :launcher, h.fei['expid'], h.fei['sub_wfid'], h.fei['wfid'] ] - #p [ :launched, i['expid'], i['sub_wfid'], i['wfid'] ] + #p [ :launcher, h.fei['expid'], h.fei['subid'], h.fei['wfid'] ] + #p [ :launched, i['expid'], i['subid'], i['wfid'] ] forget = opts[:forget] register_child(i) unless forget @@ -466,11 +520,11 @@ end # Returns true if the given fei points to an expression in the parent # chain of this expression. # - def ancestor? (fei) + def ancestor?(fei) return false unless h.parent_id return true if h.parent_id == fei parent.ancestor?(fei) @@ -487,18 +541,19 @@ elsif h.parent_id par = parent # :( get_parent would probably be a better name for #parent - unless par - puts "~~" - puts "parent gone for" - p h.fei - p h.parent_id - p tree - puts "~~" - end + #if par.nil? && ($DEBUG || ARGV.include?('-d')) + # puts "~~" + # puts "parent gone for" + # puts "fei #{Ruote.sid(h.fei)}" + # puts "tree #{tree.inspect}" + # puts "replying to #{Ruote.sid(h.parent_id)}" + # puts "~~" + #end + # is sometimes helpful during debug sessions par ? par.lookup_on_error : nil else @@ -506,11 +561,11 @@ end end # Looks up parent with on_error attribute and triggers it # - def handle_on_error (msg, error) + def handle_on_error(msg, error) return false if h.state == 'failing' oe_parent = lookup_on_error @@ -522,12 +577,17 @@ return false if handler == '' # empty on_error handler nullifies ancestor's on_error workitem = msg['workitem'] - workitem['fields']['__error__'] = [ - h.fei, Ruote.now_to_utc_s, error.class.to_s, error.message, error.backtrace ] + workitem['fields']['__error__'] = { + 'fei' => fei, + 'at' => Ruote.now_to_utc_s, + 'class' => error.class.to_s, + 'message' => error.message, + 'trace' => error.backtrace + } @context.storage.put_msg( 'fail', 'fei' => oe_parent.h.fei, 'workitem' => workitem) @@ -560,53 +620,47 @@ # # seq.update_tree # seq.updated_tree[2] << [ 'participant', { 'ref' => 'bob' }, [] ] # seq.do_persist # - def update_tree (t=nil) + def update_tree(t=nil) h.updated_tree = t || Ruote.fulldup(h.original_tree) end + # Returns the name of this expression, like 'sequence', 'participant', + # 'cursor', etc... + # def name + tree[0] end + # Returns the attributes of this expression (like { 'ref' => 'toto' } or + # { 'timeout' => '2d' }. + # def attributes + tree[1] end + # Returns the "AST" view on the children of this expression... + # def tree_children + tree[2] end - # A tiny class-bound counter used when generating subprocesses ids. - # - @@sub_wfid_counter = -1 + protected - # Generates a sub_wfid, without hitting storage. + # Returns a Graphviz dot string representing this expression (and its + # children). # - # There's a better implementation for sure... - # - def get_next_sub_wfid + def to_dot(opts) - i = [ - $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect - ].join('-').hash - - @@sub_wfid_counter = (@@sub_wfid_counter + 1) % 1000 - i = i * 1000 + (@@sub_wfid_counter) - - (i < 0 ? "1#{i * -1}" : "0#{i}").to_s - end - - protected - - def to_dot (opts) - i = fei() - label = "#{[ i.wfid, i.sub_wfid, i.expid].join(" ")} #{tree.first}" + label = "#{[ i.wfid, i.subid, i.expid].join(' ')} #{tree.first}" label += " (#{h.state})" if h.state a = [] a << "\"#{i.to_storage_id}\" [ label=\"#{label}\" ];" @@ -623,13 +677,18 @@ end a end - def pre_apply_child (child_index, workitem, forget) + # Used locally but also by ConcurrenceExpression, when preparing children + # before they get applied. + # + def pre_apply_child(child_index, workitem, forget) - child_fei = h.fei.merge('expid' => "#{h.fei['expid']}_#{child_index}") + child_fei = h.fei.merge( + 'expid' => "#{h.fei['expid']}_#{child_index}", + 'subid' => Ruote.generate_subid(h.fei.inspect)) h.children << child_fei unless forget msg = { 'fei' => child_fei, @@ -641,33 +700,47 @@ msg['forgotten'] = true if forget msg end - def apply_child (child_index, workitem, forget=false) + # Used by expressions when, well, applying a child expression of theirs. + # + def apply_child(child_index, workitem, forget=false) msg = pre_apply_child(child_index, workitem, forget) persist_or_raise unless forget + # no need to persist the parent (this) if the child is to be forgotten @context.storage.put_msg('apply', msg) end - def register_child (fei) + # Some expressions have to keep track of their (instantiated) children, + # this method does the registration (of the child's fei). + # + def register_child(fei) h.children << fei persist_or_raise end + # Called to check if the expression has a :tag attribute. If yes, + # will register the tag in a variable (and in the workitem). + # def consider_tag if h.tagname = attribute(:tag) set_variable(h.tagname, h.fei) + Ruote::Workitem.add_tag(h.applied_workitem, h.tagname) + @context.storage.put_msg( - 'entered_tag', 'tag' => h.tagname, 'fei' => h.fei) + 'entered_tag', + 'tag' => h.tagname, + 'fei' => h.fei, + 'workitem' => h.applied_workitem) end end # Called by do_apply. Overriden in ParticipantExpression and RefExpression. # @@ -677,13 +750,14 @@ end # Called by consider_timeout (FlowExpression) and schedule_timeout # (ParticipantExpression). # - def do_schedule_timeout (timeout) + def do_schedule_timeout(timeout) - return unless timeout + timeout = timeout.to_s + return if timeout.strip == '' h.timeout_schedule_id = @context.storage.put_schedule( 'at', h.fei, @@ -693,11 +767,11 @@ 'flavour' => 'timeout') end # (Called by trigger_on_cancel & co) # - def supplant_with (tree, opts) + def supplant_with(tree, opts) # at first, nuke self r = try_unpersist @@ -722,31 +796,32 @@ }.merge!(opts)) end # 'on_{error|timeout|cancel|re_apply}' triggering # - def trigger (on, workitem) + def trigger(on, workitem) hon = h[on] t = hon.is_a?(String) ? [ hon, {}, [] ] : hon if on == 'on_error' - if hon == 'redo' + if hon == 'redo' or hon == 'retry' t = tree - elsif hon == 'undo' + elsif hon == 'undo' or hon == 'pass' h.state = 'failed' reply_to_parent(workitem) + return end elsif on == 'on_timeout' - t = tree if hon == 'redo' + t = tree if hon == 'redo' or hon == 'retry' end supplant_with(t, 'trigger' => on) end end