lib/ruote/svc/tracker.rb in ruote-2.2.0 vs lib/ruote/svc/tracker.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -35,68 +35,58 @@ class Tracker def initialize(context) @context = context - - if @context.worker - # - # this is a worker context, DO log - # - @context.worker.subscribe(:all, self) - #else - # - # this is not a worker context, no notifications. BUT - # honour calls to add_tracker/remove_tracker - # - end end - # The worker passes all the messages it has to process to the tracker via - # this method. + # The context calls this method for each successfully processed msg + # in the worker. # - def notify(msg) + def on_msg(message) - m_error = msg['error'] - m_wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil) - m_action = msg['action'] + m_error = message['error'] + m_wfid = message['wfid'] || (message['fei']['wfid'] rescue nil) + m_action = message['action'] + msg = m_action == 'error_intercepted' ? message['msg'] : message + @context.storage.get_trackers['trackers'].each do |tracker_id, tracker| # filter msgs t_wfid = tracker['wfid'] t_action = tracker['action'] next if t_wfid && t_wfid != m_wfid next if t_action && t_action != m_action - next unless does_match?(msg, tracker['conditions']) + next unless does_match?(message, tracker['conditions']) - msg = msg['msg'] if m_action == 'error_intercepted' - if tracker_id == 'on_error' || tracker_id == 'on_terminate' fields = msg['workitem']['fields'] next if m_action == 'error_intercepted' && fields['__error__'] next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__']) end # prepare and emit/put 'reaction' message - m = tracker['msg'] + m = Ruote.fulldup(tracker['msg']) action = m.delete('action') m['wfid'] = m_wfid if m['wfid'] == 'replace' m['wfid'] ||= @context.wfidgen.generate m['workitem'] = msg['workitem'] if m['workitem'] == 'replace' - if tracker_id == 'on_error' && m_action == 'error_intercepted' + if t_action == 'error_intercepted' m['workitem']['fields']['__error__'] = m_error + elsif tracker_id == 'on_error' && m_action == 'error_intercepted' + m['workitem']['fields']['__error__'] = m_error elsif tracker_id == 'on_terminate' && m_action == 'terminated' m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid } end if m['variables'] == 'compile' @@ -110,10 +100,13 @@ # Adds a tracker (usually when a 'listen' expression gets applied). # def add_tracker(wfid, action, id, conditions, msg) + conditions = + conditions && conditions.remap { |(k, v), h| h[k] = Array(v) } + doc = @context.storage.get_trackers doc['trackers'][id] = { 'wfid' => wfid, 'action' => action, @@ -121,11 +114,11 @@ 'conditions' => conditions, 'msg' => msg } r = @context.storage.put(doc) - add_tracker(wfid, action, id, msg) if r + add_tracker(wfid, action, id, conditions, msg) if r # the put failed, have to redo the work end # Removes a tracker (usually when a 'listen' expression replies to its # parent expression or is cancelled). @@ -148,16 +141,23 @@ return true unless conditions conditions.each do |k, v| - val = msg[k] - v = Ruote.regex_or_s(v) + return false unless Array(v).find do |vv| - if v.is_a?(String) - return false unless val && v == val - else - return false unless val && v.match(val) + # the Array(v) is for backward compatibility, although newer + # track conditions are already stored as arrays. + + vv = Ruote.regex_or_s(vv) + + val = case k + when 'class' then msg['error']['class'] + when 'message' then msg['error']['message'] + else msg[k] + end + + val && (vv.is_a?(String) ? (vv == val) : vv.match(val)) end end true end