lib/ruote/exp/flowexpression.rb in ruote-2.1.11 vs lib/ruote/exp/flowexpression.rb in ruote-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -58,10 +58,11 @@
include Ruote::WithMeta
require 'ruote/exp/ro_persist'
require 'ruote/exp/ro_attributes'
require 'ruote/exp/ro_variables'
+ require 'ruote/exp/ro_filters'
COMMON_ATT_KEYS = %w[
if unless forget timeout on_error on_cancel on_timeout ]
attr_reader :context
@@ -77,15 +78,16 @@
h_reader :on_error
h_reader :on_cancel
h_reader :on_timeout
- def initialize (context, h)
+ def initialize(context, h)
@context = context
@msg = nil
+ # contains generally the msg the expression got instantiated for
self.h = h
h._id ||= Ruote.to_storage_id(h.fei)
h['type'] ||= 'expressions'
@@ -97,26 +99,37 @@
h.on_cancel ||= attribute(:on_cancel)
h.on_error ||= attribute(:on_error)
h.on_timeout ||= attribute(:on_timeout)
end
- def h= (hash)
+ def h=(hash)
@h = hash
class << h
include Ruote::HashDot
end
end
+ # Returns the Ruote::FlowExpressionId for this expression.
+ #
def fei
+
Ruote::FlowExpressionId.new(h.fei)
end
+ # Returns the Ruote::FlowExpressionIf of the parent expression, or nil
+ # if there is no parent expression.
+ #
def parent_id
+
h.parent_id ? Ruote::FlowExpressionId.new(h.parent_id) : nil
end
+ # Fetches the parent expression, or returns nil if there is no parent
+ # expression.
+ #
def parent
+
Ruote::Exp::FlowExpression.fetch(@context, h.parent_id)
end
# Turns this FlowExpression instance into a Hash (well, just hands back
# the base hash behind it).
@@ -126,20 +139,20 @@
@h
end
# Instantiates expression back from hash.
#
- def self.from_h (context, h)
+ def self.from_h(context, h)
exp_class = context.expmap.expression_class(h['name'])
exp_class.new(context, h)
end
# Fetches an expression from the storage and readies it for service.
#
- def self.fetch (context, fei)
+ def self.fetch(context, fei)
return nil if fei.nil?
fexp = context.storage.get('expressions', Ruote.to_storage_id(fei))
@@ -150,25 +163,29 @@
# META
#++
# Keeping track of names and aliases for the expression
#
- def self.names (*exp_names)
+ def self.names(*exp_names)
exp_names = exp_names.collect { |n| n.to_s }
meta_def(:expression_names) { exp_names }
end
#--
# apply/reply
#++
- def self.do_action (context, msg)
+ # Called by the worker when it has something to do for a FlowExpression.
+ #
+ def self.do_action(context, msg)
fei = msg['fei']
action = msg['action']
+ p msg unless fei
+
if action == 'reply' && fei['engine_id'] != context.engine_id
#
# the reply has to go to another engine, let's locate the
# 'engine participant' and give it the workitem/reply
#
@@ -197,43 +214,66 @@
# this retry system is only useful with ruote-couch
fexp.send("do_#{action}", msg) if fexp
end
- def do_apply
+ # Called by the worker when it has just created this FlowExpression and
+ # wants to apply it.
+ #
+ def do_apply(msg)
+ @msg = Ruote.fulldup(msg)
+
if not Condition.apply?(attribute(:if), attribute(:unless))
return reply_to_parent(h.applied_workitem)
end
if attribute(:forget).to_s == 'true'
- i = h.parent_id
+ pi = h.parent_id
wi = Ruote.fulldup(h.applied_workitem)
h.variables = compile_variables
h.parent_id = nil
h.forgotten = true
- @context.storage.put_msg('reply', 'fei' => i, 'workitem' => wi)
+ @context.storage.put_msg('reply', 'fei' => pi, 'workitem' => wi) if pi
+ # reply to parent immediately (if there is a parent)
+
+ elsif attribute(:lose).to_s == 'true'
+
+ h.lost = true
end
+ filter
+
consider_tag
consider_timeout
apply
end
- def reply_to_parent (workitem, delete=true)
+ # FlowExpression call this method when they're done and they want their
+ # parent expression to take over (it will end up calling the #reply of
+ # the parent expression).
+ #
+ def reply_to_parent(workitem, delete=true)
+ filter(workitem)
+
if h.tagname
unset_variable(h.tagname)
+ Ruote::Workitem.remove_tag(workitem, h.tagname)
+
@context.storage.put_msg(
- 'left_tag', 'tag' => h.tagname, 'fei' => h.fei)
+ 'left_tag',
+ 'tag' => h.tagname,
+ 'fei' => h.fei,
+ 'workitem' => workitem)
end
if h.timeout_schedule_id && h.state != 'timing_out'
@context.storage.delete_schedule(h.timeout_schedule_id)
@@ -241,29 +281,30 @@
if h.state == 'failing' # on_error is implicit (#fail got called)
trigger('on_error', workitem)
- elsif (h.state == 'cancelling') and h.on_cancel
+ elsif h.state == 'cancelling' and h.on_cancel
trigger('on_cancel', workitem)
- elsif (h.state == 'cancelling') and h.on_re_apply
+ elsif h.state == 'cancelling' and h.on_re_apply
trigger('on_re_apply', workitem)
- elsif (h.state == 'timing_out') and h.on_timeout
+ elsif h.state == 'timing_out' and h.on_timeout
trigger('on_timeout', workitem)
+ elsif h.lost and h.state == nil
+
+ # do not reply, sit here (and wait for cancellation probably)
+
else # vanilla reply
- #unpersist_or_raise if delete
- #try_unpersist if delete
- if delete
- do_unpersist || return
- end
+ (do_unpersist || return) if delete
+ # remove expression from storage
if h.parent_id
@context.storage.put_msg(
'reply',
@@ -279,11 +320,13 @@
'workitem' => workitem)
end
end
end
- def do_reply (msg)
+ # Wraps #reply (does the administrative part of the reply work).
+ #
+ def do_reply(msg)
@msg = Ruote.fulldup(msg)
# keeping the message, for 'retry' in collision cases
workitem = msg['workitem']
@@ -316,20 +359,22 @@
#
alias :do_receive :do_reply
# A default implementation for all the expressions.
#
- def reply (workitem)
+ def reply(workitem)
reply_to_parent(workitem)
end
# The raw handling of messages passed to expressions (the fine handling
# is done in the #cancel method).
#
- def do_cancel (msg)
+ def do_cancel(msg)
+ @msg = Ruote.fulldup(msg)
+
flavour = msg['flavour']
return if h.state == 'cancelling' && flavour != 'kill'
# cancel on cancel gets discarded
@@ -373,11 +418,11 @@
end
# This default implementation cancels all the [registered] children
# of this expression.
#
- def cancel (flavour)
+ def cancel(flavour)
return reply_to_parent(h.applied_workitem) if h.children.empty?
#
# there are no children, nothing to cancel, let's just reply to
# the parent expression
@@ -418,12 +463,18 @@
# 'fei' => h.fei,
# 'flavour' => flavour)
#end
end
- def do_fail (msg)
+ # Called when handling an on_error, will place itself in a 'failing' state
+ # and cancel the children (when the reply from the children comes back,
+ # the on_reply will get triggered).
+ #
+ def do_fail(msg)
+ @msg = Ruote.fulldup(msg)
+
@h['state'] = 'failing'
@h['applied_workitem'] = msg['workitem']
if h.children.size < 1
reply_to_parent(@h['applied_workitem'])
@@ -435,19 +486,22 @@
#--
# misc
#++
- def launch_sub (pos, subtree, opts={})
+ # Launches a subprocesses (usually called from the #apply of certain
+ # expression implementations.
+ #
+ def launch_sub(pos, subtree, opts={})
- i = h.fei.dup
- i['sub_wfid'] = get_next_sub_wfid
- i['expid'] = pos
+ i = h.fei.merge(
+ 'subid' => Ruote.generate_subid(h.fei.inspect),
+ 'expid' => pos)
#p '=== launch_sub ==='
- #p [ :launcher, h.fei['expid'], h.fei['sub_wfid'], h.fei['wfid'] ]
- #p [ :launched, i['expid'], i['sub_wfid'], i['wfid'] ]
+ #p [ :launcher, h.fei['expid'], h.fei['subid'], h.fei['wfid'] ]
+ #p [ :launched, i['expid'], i['subid'], i['wfid'] ]
forget = opts[:forget]
register_child(i) unless forget
@@ -466,11 +520,11 @@
end
# Returns true if the given fei points to an expression in the parent
# chain of this expression.
#
- def ancestor? (fei)
+ def ancestor?(fei)
return false unless h.parent_id
return true if h.parent_id == fei
parent.ancestor?(fei)
@@ -487,18 +541,19 @@
elsif h.parent_id
par = parent
# :( get_parent would probably be a better name for #parent
- unless par
- puts "~~"
- puts "parent gone for"
- p h.fei
- p h.parent_id
- p tree
- puts "~~"
- end
+ #if par.nil? && ($DEBUG || ARGV.include?('-d'))
+ # puts "~~"
+ # puts "parent gone for"
+ # puts "fei #{Ruote.sid(h.fei)}"
+ # puts "tree #{tree.inspect}"
+ # puts "replying to #{Ruote.sid(h.parent_id)}"
+ # puts "~~"
+ #end
+ # is sometimes helpful during debug sessions
par ? par.lookup_on_error : nil
else
@@ -506,11 +561,11 @@
end
end
# Looks up parent with on_error attribute and triggers it
#
- def handle_on_error (msg, error)
+ def handle_on_error(msg, error)
return false if h.state == 'failing'
oe_parent = lookup_on_error
@@ -522,12 +577,17 @@
return false if handler == ''
# empty on_error handler nullifies ancestor's on_error
workitem = msg['workitem']
- workitem['fields']['__error__'] = [
- h.fei, Ruote.now_to_utc_s, error.class.to_s, error.message, error.backtrace ]
+ workitem['fields']['__error__'] = {
+ 'fei' => fei,
+ 'at' => Ruote.now_to_utc_s,
+ 'class' => error.class.to_s,
+ 'message' => error.message,
+ 'trace' => error.backtrace
+ }
@context.storage.put_msg(
'fail',
'fei' => oe_parent.h.fei,
'workitem' => workitem)
@@ -560,53 +620,47 @@
#
# seq.update_tree
# seq.updated_tree[2] << [ 'participant', { 'ref' => 'bob' }, [] ]
# seq.do_persist
#
- def update_tree (t=nil)
+ def update_tree(t=nil)
h.updated_tree = t || Ruote.fulldup(h.original_tree)
end
+ # Returns the name of this expression, like 'sequence', 'participant',
+ # 'cursor', etc...
+ #
def name
+
tree[0]
end
+ # Returns the attributes of this expression (like { 'ref' => 'toto' } or
+ # { 'timeout' => '2d' }.
+ #
def attributes
+
tree[1]
end
+ # Returns the "AST" view on the children of this expression...
+ #
def tree_children
+
tree[2]
end
- # A tiny class-bound counter used when generating subprocesses ids.
- #
- @@sub_wfid_counter = -1
+ protected
- # Generates a sub_wfid, without hitting storage.
+ # Returns a Graphviz dot string representing this expression (and its
+ # children).
#
- # There's a better implementation for sure...
- #
- def get_next_sub_wfid
+ def to_dot(opts)
- i = [
- $$, Time.now.to_f.to_s, self.hash.to_s, @h['fei'].inspect
- ].join('-').hash
-
- @@sub_wfid_counter = (@@sub_wfid_counter + 1) % 1000
- i = i * 1000 + (@@sub_wfid_counter)
-
- (i < 0 ? "1#{i * -1}" : "0#{i}").to_s
- end
-
- protected
-
- def to_dot (opts)
-
i = fei()
- label = "#{[ i.wfid, i.sub_wfid, i.expid].join(" ")} #{tree.first}"
+ label = "#{[ i.wfid, i.subid, i.expid].join(' ')} #{tree.first}"
label += " (#{h.state})" if h.state
a = []
a << "\"#{i.to_storage_id}\" [ label=\"#{label}\" ];"
@@ -623,13 +677,18 @@
end
a
end
- def pre_apply_child (child_index, workitem, forget)
+ # Used locally but also by ConcurrenceExpression, when preparing children
+ # before they get applied.
+ #
+ def pre_apply_child(child_index, workitem, forget)
- child_fei = h.fei.merge('expid' => "#{h.fei['expid']}_#{child_index}")
+ child_fei = h.fei.merge(
+ 'expid' => "#{h.fei['expid']}_#{child_index}",
+ 'subid' => Ruote.generate_subid(h.fei.inspect))
h.children << child_fei unless forget
msg = {
'fei' => child_fei,
@@ -641,33 +700,47 @@
msg['forgotten'] = true if forget
msg
end
- def apply_child (child_index, workitem, forget=false)
+ # Used by expressions when, well, applying a child expression of theirs.
+ #
+ def apply_child(child_index, workitem, forget=false)
msg = pre_apply_child(child_index, workitem, forget)
persist_or_raise unless forget
+ # no need to persist the parent (this) if the child is to be forgotten
@context.storage.put_msg('apply', msg)
end
- def register_child (fei)
+ # Some expressions have to keep track of their (instantiated) children,
+ # this method does the registration (of the child's fei).
+ #
+ def register_child(fei)
h.children << fei
persist_or_raise
end
+ # Called to check if the expression has a :tag attribute. If yes,
+ # will register the tag in a variable (and in the workitem).
+ #
def consider_tag
if h.tagname = attribute(:tag)
set_variable(h.tagname, h.fei)
+ Ruote::Workitem.add_tag(h.applied_workitem, h.tagname)
+
@context.storage.put_msg(
- 'entered_tag', 'tag' => h.tagname, 'fei' => h.fei)
+ 'entered_tag',
+ 'tag' => h.tagname,
+ 'fei' => h.fei,
+ 'workitem' => h.applied_workitem)
end
end
# Called by do_apply. Overriden in ParticipantExpression and RefExpression.
#
@@ -677,13 +750,14 @@
end
# Called by consider_timeout (FlowExpression) and schedule_timeout
# (ParticipantExpression).
#
- def do_schedule_timeout (timeout)
+ def do_schedule_timeout(timeout)
- return unless timeout
+ timeout = timeout.to_s
+
return if timeout.strip == ''
h.timeout_schedule_id = @context.storage.put_schedule(
'at',
h.fei,
@@ -693,11 +767,11 @@
'flavour' => 'timeout')
end
# (Called by trigger_on_cancel & co)
#
- def supplant_with (tree, opts)
+ def supplant_with(tree, opts)
# at first, nuke self
r = try_unpersist
@@ -722,31 +796,32 @@
}.merge!(opts))
end
# 'on_{error|timeout|cancel|re_apply}' triggering
#
- def trigger (on, workitem)
+ def trigger(on, workitem)
hon = h[on]
t = hon.is_a?(String) ? [ hon, {}, [] ] : hon
if on == 'on_error'
- if hon == 'redo'
+ if hon == 'redo' or hon == 'retry'
t = tree
- elsif hon == 'undo'
+ elsif hon == 'undo' or hon == 'pass'
h.state = 'failed'
reply_to_parent(workitem)
+
return
end
elsif on == 'on_timeout'
- t = tree if hon == 'redo'
+ t = tree if hon == 'redo' or hon == 'retry'
end
supplant_with(t, 'trigger' => on)
end
end