lib/ruote/svc/tracker.rb in ruote-2.2.0 vs lib/ruote/svc/tracker.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -35,68 +35,58 @@
class Tracker
def initialize(context)
@context = context
-
- if @context.worker
- #
- # this is a worker context, DO log
- #
- @context.worker.subscribe(:all, self)
- #else
- #
- # this is not a worker context, no notifications. BUT
- # honour calls to add_tracker/remove_tracker
- #
- end
end
- # The worker passes all the messages it has to process to the tracker via
- # this method.
+ # The context calls this method for each successfully processed msg
+ # in the worker.
#
- def notify(msg)
+ def on_msg(message)
- m_error = msg['error']
- m_wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil)
- m_action = msg['action']
+ m_error = message['error']
+ m_wfid = message['wfid'] || (message['fei']['wfid'] rescue nil)
+ m_action = message['action']
+ msg = m_action == 'error_intercepted' ? message['msg'] : message
+
@context.storage.get_trackers['trackers'].each do |tracker_id, tracker|
# filter msgs
t_wfid = tracker['wfid']
t_action = tracker['action']
next if t_wfid && t_wfid != m_wfid
next if t_action && t_action != m_action
- next unless does_match?(msg, tracker['conditions'])
+ next unless does_match?(message, tracker['conditions'])
- msg = msg['msg'] if m_action == 'error_intercepted'
-
if tracker_id == 'on_error' || tracker_id == 'on_terminate'
fields = msg['workitem']['fields']
next if m_action == 'error_intercepted' && fields['__error__']
next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__'])
end
# prepare and emit/put 'reaction' message
- m = tracker['msg']
+ m = Ruote.fulldup(tracker['msg'])
action = m.delete('action')
m['wfid'] = m_wfid if m['wfid'] == 'replace'
m['wfid'] ||= @context.wfidgen.generate
m['workitem'] = msg['workitem'] if m['workitem'] == 'replace'
- if tracker_id == 'on_error' && m_action == 'error_intercepted'
+ if t_action == 'error_intercepted'
m['workitem']['fields']['__error__'] = m_error
+ elsif tracker_id == 'on_error' && m_action == 'error_intercepted'
+ m['workitem']['fields']['__error__'] = m_error
elsif tracker_id == 'on_terminate' && m_action == 'terminated'
m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid }
end
if m['variables'] == 'compile'
@@ -110,10 +100,13 @@
# Adds a tracker (usually when a 'listen' expression gets applied).
#
def add_tracker(wfid, action, id, conditions, msg)
+ conditions =
+ conditions && conditions.remap { |(k, v), h| h[k] = Array(v) }
+
doc = @context.storage.get_trackers
doc['trackers'][id] =
{ 'wfid' => wfid,
'action' => action,
@@ -121,11 +114,11 @@
'conditions' => conditions,
'msg' => msg }
r = @context.storage.put(doc)
- add_tracker(wfid, action, id, msg) if r
+ add_tracker(wfid, action, id, conditions, msg) if r
# the put failed, have to redo the work
end
# Removes a tracker (usually when a 'listen' expression replies to its
# parent expression or is cancelled).
@@ -148,16 +141,23 @@
return true unless conditions
conditions.each do |k, v|
- val = msg[k]
- v = Ruote.regex_or_s(v)
+ return false unless Array(v).find do |vv|
- if v.is_a?(String)
- return false unless val && v == val
- else
- return false unless val && v.match(val)
+ # the Array(v) is for backward compatibility, although newer
+ # track conditions are already stored as arrays.
+
+ vv = Ruote.regex_or_s(vv)
+
+ val = case k
+ when 'class' then msg['error']['class']
+ when 'message' then msg['error']['message']
+ else msg[k]
+ end
+
+ val && (vv.is_a?(String) ? (vv == val) : vv.match(val))
end
end
true
end