lib/ruote/svc/tracker.rb in ruote-2.3.0.1 vs lib/ruote/svc/tracker.rb in ruote-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -37,23 +37,114 @@
def initialize(context)
@context = context
end
- # The context calls this method for each successfully processed msg
- # in the worker.
+ # The worker calls this method via the context before each msg gets
+ # processed.
#
- def on_msg(message)
+ def on_pre_msg(msg)
- m_error = message['error']
+ on_message(true, msg)
+ end
+
+ # The worker calls this method via the context after each successful
+ # msg processing.
+ #
+ def on_msg(msg)
+
+ on_message(false, msg)
+ end
+
+ # Adds a tracker (usually when a 'listen' expression gets applied).
+ #
+ # The tracker_id may be nil (one will then get generated).
+ #
+ # Returns the tracker_id.
+ #
+ def add_tracker(wfid, action, tracker_id, conditions, msg)
+
+ tracker_id ||= [
+ 'tracker', wfid, action,
+ Ruote.generate_subid(conditions.hash.to_s + msg.hash.to_s)
+ ].collect(&:to_s).join('_')
+
+ conditions =
+ conditions && conditions.remap { |(k, v), h| h[k] = Array(v) }
+
+ doc = @context.storage.get_trackers
+
+ doc['trackers'][tracker_id] =
+ { 'wfid' => wfid,
+ 'action' => action,
+ 'id' => tracker_id,
+ 'conditions' => conditions,
+ 'msg' => msg }
+
+ r = @context.storage.put(doc)
+
+ add_tracker(wfid, action, tracker_id, conditions, msg) if r
+ # the put failed, have to redo the work
+
+ tracker_id
+ end
+
+ # Removes a tracker (usually when a 'listen' expression replies to its
+ # parent expression or is cancelled).
+ #
+ def remove_tracker(fei_sid_or_id, wfid=nil)
+
+ tracker_id =
+ if fei_sid_or_id.is_a?(String)
+ fei_sid_or_id
+ else
+ Ruote.to_storage_id(fei_sid_or_id)
+ end
+
+ remove([ tracker_id ], wfid)
+ end
+
+ protected
+
+ # Removes a set of tracker ids and updated the tracker document.
+ #
+ def remove(tracker_ids, wfid)
+
+ return if tracker_ids.empty?
+
+ doc ||= @context.storage.get_trackers(wfid)
+
+ return if (doc['trackers'].keys & tracker_ids).empty?
+
+ doc['wfid'] = wfid
+ # a little helper for some some storage implementations like ruote-swf
+ # they need to know what workflow execution is targetted.
+
+ tracker_ids.each { |ti| doc['trackers'].delete(ti) }
+ r = @context.storage.put(doc)
+
+ remove(tracker_ids, wfid) if r
+ # the put failed, have to redo the work
+ end
+
+ # The method behind on_pre_msg and on_msg. Filters msgs against trackers.
+ # Triggers trackers if there is a match.
+ #
+ def on_message(pre, message)
+
m_wfid = message['wfid'] || (message['fei']['wfid'] rescue nil)
+ m_error = message['error']
+
m_action = message['action']
+ m_action = "pre_#{m_action}" if pre
msg = m_action == 'error_intercepted' ? message['msg'] : message
- @context.storage.get_trackers['trackers'].each do |tracker_id, tracker|
+ ids_to_remove = []
+ trackers.each do |tracker_id, tracker|
+
# filter msgs
t_wfid = tracker['wfid']
t_action = tracker['action']
@@ -62,83 +153,88 @@
next unless does_match?(message, tracker['conditions'])
if tracker_id == 'on_error' || tracker_id == 'on_terminate'
- fields = msg['workitem']['fields']
+ fs = msg['workitem']['fields']
- next if m_action == 'error_intercepted' && fields['__error__']
- next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__'])
+ next if m_action == 'error_intercepted' && fs['__error__']
+ next if m_action == 'terminated' && (fs['__error__'] || fs['__terminate__'])
end
- # prepare and emit/put 'reaction' message
+ # remove the message post-trigger?
- m = Ruote.fulldup(tracker['msg'])
+ ids_to_remove << tracker_id if tracker['msg'].delete('_auto_remove')
- action = m.delete('action')
+ # OK, have to pull the trigger (or alter the message) then
- m['wfid'] = m_wfid if m['wfid'] == 'replace'
- m['wfid'] ||= @context.wfidgen.generate
-
- m['workitem'] = msg['workitem'] if m['workitem'] == 'replace'
-
- if t_action == 'error_intercepted'
- m['workitem']['fields']['__error__'] = m_error
- elsif tracker_id == 'on_error' && m_action == 'error_intercepted'
- m['workitem']['fields']['__error__'] = m_error
- elsif tracker_id == 'on_terminate' && m_action == 'terminated'
- m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid }
+ if pre && tracker['msg']['_alter']
+ alter(m_wfid, m_error, m_action, msg, tracker)
+ else
+ trigger(m_wfid, m_error, m_action, msg, tracker)
end
+ end
- if m['variables'] == 'compile'
- fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei'])
- m['variables'] = fexp ? fexp.compile_variables : {}
- end
+ remove(ids_to_remove, nil)
+ end
- @context.storage.put_msg(action, m)
+ # Alters the msg, only called in "pre" mode.
+ #
+ def alter(m_wfid, m_error, m_action, msg, tracker)
+
+ case tracker['msg'].delete('_alter')
+ when 'merge' then msg.merge!(tracker['msg'])
+ #else ...
end
end
- # Adds a tracker (usually when a 'listen' expression gets applied).
+ # Prepares the message that gets placed on the ruote msg queue.
#
- def add_tracker(wfid, action, id, conditions, msg)
+ def trigger(m_wfid, m_error, m_action, msg, tracker)
- conditions =
- conditions && conditions.remap { |(k, v), h| h[k] = Array(v) }
+ t_action = tracker['action']
+ tracker_id = tracker['id']
- doc = @context.storage.get_trackers
+ m = Ruote.fulldup(tracker['msg'])
- doc['trackers'][id] =
- { 'wfid' => wfid,
- 'action' => action,
- 'id' => id,
- 'conditions' => conditions,
- 'msg' => msg }
+ action = m.delete('action')
- r = @context.storage.put(doc)
+ m['wfid'] = m_wfid if m['wfid'] == 'replace'
+ m['wfid'] ||= @context.wfidgen.generate
- add_tracker(wfid, action, id, conditions, msg) if r
- # the put failed, have to redo the work
- end
+ m['workitem'] = msg['workitem'] if m['workitem'] == 'replace'
- # Removes a tracker (usually when a 'listen' expression replies to its
- # parent expression or is cancelled).
- #
- def remove_tracker(fei, doc=nil)
+ if t_action == 'error_intercepted'
+ m['workitem']['fields']['__error__'] = m_error
+ elsif tracker_id == 'on_error' && m_action == 'error_intercepted'
+ m['workitem']['fields']['__error__'] = m_error
+ elsif tracker_id == 'on_terminate' && m_action == 'terminated'
+ m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid }
+ end
- doc ||= @context.storage.get_trackers
+ if m['variables'] == 'compile'
+ fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei'])
+ m['variables'] = fexp ? fexp.compile_variables : {}
+ end
- doc['trackers'].delete(Ruote.to_storage_id(fei))
+ @context.storage.put_msg(action, m)
+ end
- r = @context.storage.put(doc)
+ # Returns the trackers currently registered.
+ #
+ # Note: this is called from on_pre_msg and on_msg, hence two times
+ # for a single msg. We trust the storage implementation to cache it
+ # for us.
+ #
+ def trackers
- remove_tracker(fei, r) if r
- # the put failed, have to redo the work
+ @context.storage.get_trackers['trackers']
end
- protected
-
+ # Given a msg and a hash of conditions, returns true if the msg
+ # matches the conditions.
+ #
def does_match?(msg, conditions)
return true unless conditions
conditions.each do |k, v|
@@ -149,15 +245,17 @@
# track conditions are already stored as arrays.
vv = Ruote.regex_or_s(vv)
val = case k
+
when 'class' then msg['error']['class']
when 'message' then msg['error']['message']
- else msg[k]
+
+ else Ruote.lookup(msg, k)
end
- val && (vv.is_a?(String) ? (vv == val) : vv.match(val))
+ val && (vv.is_a?(Regexp) ? vv.match(val) : vv == val)
end
end
true
end