lib/ruote/svc/tracker.rb in ruote-2.1.11 vs lib/ruote/svc/tracker.rb in ruote-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -32,11 +32,11 @@
#
# Look at the ListenExpression for more details.
#
class Tracker
- def initialize (context)
+ def initialize(context)
@context = context
if @context.worker
#
@@ -52,56 +52,87 @@
end
# The worker passes all the messages it has to process to the tracker via
# this method.
#
- def notify (msg)
+ def notify(msg)
- doc = @context.storage.get_trackers
+ m_error = msg['error']
+ m_wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil)
+ m_action = msg['action']
- doc['trackers'].values.each do |tracker|
+ @context.storage.get_trackers['trackers'].each do |tracker_id, tracker|
+ # filter msgs
+
t_wfid = tracker['wfid']
t_action = tracker['action']
- m_wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil)
next if t_wfid && t_wfid != m_wfid
- next if t_action && t_action != msg['action']
+ next if t_action && t_action != m_action
next unless does_match?(msg, tracker['conditions'])
+ msg = msg['msg'] if m_action == 'error_intercepted'
+
+ if tracker_id == 'on_error' || tracker_id == 'on_terminate'
+
+ fields = msg['workitem']['fields']
+
+ next if m_action == 'error_intercepted' && fields['__error__']
+ next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__'])
+ end
+
+ # prepare and emit/put 'reaction' message
+
m = tracker['msg']
- @context.storage.put_msg(
- m.delete('action'),
- m.merge!('workitem' => msg['workitem']))
+ action = m.delete('action')
+
+ m['wfid'] = m_wfid if m['wfid'] == 'replace'
+ m['wfid'] ||= @context.wfidgen.generate
+
+ m['workitem'] = msg['workitem'] if m['workitem'] == 'replace'
+
+ if tracker_id == 'on_error' && m_action == 'error_intercepted'
+ m['workitem']['fields']['__error__'] = m_error
+ elsif tracker_id == 'on_terminate' && m_action == 'terminated'
+ m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid }
+ end
+
+ if m['variables'] == 'compile'
+ fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei'])
+ m['variables'] = fexp ? fexp.compile_variables : {}
+ end
+
+ @context.storage.put_msg(action, m)
end
end
# Adds a tracker (usually when a 'listen' expression gets applied).
#
- def add_tracker (wfid, action, fei, conditions, msg, doc=nil)
+ def add_tracker(wfid, action, id, conditions, msg)
- doc ||= @context.storage.get_trackers
+ doc = @context.storage.get_trackers
- doc['trackers'][Ruote.to_storage_id(fei)] =
+ doc['trackers'][id] =
{ 'wfid' => wfid,
'action' => action,
- 'fei' => fei,
+ 'id' => id,
'conditions' => conditions,
'msg' => msg }
r = @context.storage.put(doc)
- add_tracker(wfid, action, fei, msg, r) if r
+ add_tracker(wfid, action, id, msg) if r
# the put failed, have to redo the work
end
# Removes a tracker (usually when a 'listen' expression replies to its
# parent expression or is cancelled).
#
- def remove_tracker (fei, doc=nil)
+ def remove_tracker(fei, doc=nil)
doc ||= @context.storage.get_trackers
doc['trackers'].delete(Ruote.to_storage_id(fei))
@@ -111,14 +142,23 @@
# the put failed, have to redo the work
end
protected
- def does_match? (msg, conditions)
+ def does_match?(msg, conditions)
+ return true unless conditions
+
conditions.each do |k, v|
+
val = msg[k]
- return false unless val && val.match(v)
+ v = Ruote.regex_or_s(v)
+
+ if v.is_a?(String)
+ return false unless val && v == val
+ else
+ return false unless val && v.match(val)
+ end
end
true
end
end