lib/ruote/svc/tracker.rb in ruote-2.1.11 vs lib/ruote/svc/tracker.rb in ruote-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -32,11 +32,11 @@ # # Look at the ListenExpression for more details. # class Tracker - def initialize (context) + def initialize(context) @context = context if @context.worker # @@ -52,56 +52,87 @@ end # The worker passes all the messages it has to process to the tracker via # this method. # - def notify (msg) + def notify(msg) - doc = @context.storage.get_trackers + m_error = msg['error'] + m_wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil) + m_action = msg['action'] - doc['trackers'].values.each do |tracker| + @context.storage.get_trackers['trackers'].each do |tracker_id, tracker| + # filter msgs + t_wfid = tracker['wfid'] t_action = tracker['action'] - m_wfid = msg['wfid'] || (msg['fei']['wfid'] rescue nil) next if t_wfid && t_wfid != m_wfid - next if t_action && t_action != msg['action'] + next if t_action && t_action != m_action next unless does_match?(msg, tracker['conditions']) + msg = msg['msg'] if m_action == 'error_intercepted' + + if tracker_id == 'on_error' || tracker_id == 'on_terminate' + + fields = msg['workitem']['fields'] + + next if m_action == 'error_intercepted' && fields['__error__'] + next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__']) + end + + # prepare and emit/put 'reaction' message + m = tracker['msg'] - @context.storage.put_msg( - m.delete('action'), - m.merge!('workitem' => msg['workitem'])) + action = m.delete('action') + + m['wfid'] = m_wfid if m['wfid'] == 'replace' + m['wfid'] ||= @context.wfidgen.generate + + m['workitem'] = msg['workitem'] if m['workitem'] == 'replace' + + if tracker_id == 'on_error' && m_action == 'error_intercepted' + m['workitem']['fields']['__error__'] = m_error + elsif tracker_id == 'on_terminate' && m_action == 'terminated' + m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid } + end + + if m['variables'] == 'compile' + fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei']) + m['variables'] = fexp ? fexp.compile_variables : {} + end + + @context.storage.put_msg(action, m) end end # Adds a tracker (usually when a 'listen' expression gets applied). # - def add_tracker (wfid, action, fei, conditions, msg, doc=nil) + def add_tracker(wfid, action, id, conditions, msg) - doc ||= @context.storage.get_trackers + doc = @context.storage.get_trackers - doc['trackers'][Ruote.to_storage_id(fei)] = + doc['trackers'][id] = { 'wfid' => wfid, 'action' => action, - 'fei' => fei, + 'id' => id, 'conditions' => conditions, 'msg' => msg } r = @context.storage.put(doc) - add_tracker(wfid, action, fei, msg, r) if r + add_tracker(wfid, action, id, msg) if r # the put failed, have to redo the work end # Removes a tracker (usually when a 'listen' expression replies to its # parent expression or is cancelled). # - def remove_tracker (fei, doc=nil) + def remove_tracker(fei, doc=nil) doc ||= @context.storage.get_trackers doc['trackers'].delete(Ruote.to_storage_id(fei)) @@ -111,14 +142,23 @@ # the put failed, have to redo the work end protected - def does_match? (msg, conditions) + def does_match?(msg, conditions) + return true unless conditions + conditions.each do |k, v| + val = msg[k] - return false unless val && val.match(v) + v = Ruote.regex_or_s(v) + + if v.is_a?(String) + return false unless val && v == val + else + return false unless val && v.match(val) + end end true end end