lib/ruote/svc/tracker.rb in ruote-2.3.0.1 vs lib/ruote/svc/tracker.rb in ruote-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -37,23 +37,114 @@ def initialize(context) @context = context end - # The context calls this method for each successfully processed msg - # in the worker. + # The worker calls this method via the context before each msg gets + # processed. # - def on_msg(message) + def on_pre_msg(msg) - m_error = message['error'] + on_message(true, msg) + end + + # The worker calls this method via the context after each successful + # msg processing. + # + def on_msg(msg) + + on_message(false, msg) + end + + # Adds a tracker (usually when a 'listen' expression gets applied). + # + # The tracker_id may be nil (one will then get generated). + # + # Returns the tracker_id. + # + def add_tracker(wfid, action, tracker_id, conditions, msg) + + tracker_id ||= [ + 'tracker', wfid, action, + Ruote.generate_subid(conditions.hash.to_s + msg.hash.to_s) + ].collect(&:to_s).join('_') + + conditions = + conditions && conditions.remap { |(k, v), h| h[k] = Array(v) } + + doc = @context.storage.get_trackers + + doc['trackers'][tracker_id] = + { 'wfid' => wfid, + 'action' => action, + 'id' => tracker_id, + 'conditions' => conditions, + 'msg' => msg } + + r = @context.storage.put(doc) + + add_tracker(wfid, action, tracker_id, conditions, msg) if r + # the put failed, have to redo the work + + tracker_id + end + + # Removes a tracker (usually when a 'listen' expression replies to its + # parent expression or is cancelled). + # + def remove_tracker(fei_sid_or_id, wfid=nil) + + tracker_id = + if fei_sid_or_id.is_a?(String) + fei_sid_or_id + else + Ruote.to_storage_id(fei_sid_or_id) + end + + remove([ tracker_id ], wfid) + end + + protected + + # Removes a set of tracker ids and updated the tracker document. + # + def remove(tracker_ids, wfid) + + return if tracker_ids.empty? + + doc ||= @context.storage.get_trackers(wfid) + + return if (doc['trackers'].keys & tracker_ids).empty? + + doc['wfid'] = wfid + # a little helper for some some storage implementations like ruote-swf + # they need to know what workflow execution is targetted. + + tracker_ids.each { |ti| doc['trackers'].delete(ti) } + r = @context.storage.put(doc) + + remove(tracker_ids, wfid) if r + # the put failed, have to redo the work + end + + # The method behind on_pre_msg and on_msg. Filters msgs against trackers. + # Triggers trackers if there is a match. + # + def on_message(pre, message) + m_wfid = message['wfid'] || (message['fei']['wfid'] rescue nil) + m_error = message['error'] + m_action = message['action'] + m_action = "pre_#{m_action}" if pre msg = m_action == 'error_intercepted' ? message['msg'] : message - @context.storage.get_trackers['trackers'].each do |tracker_id, tracker| + ids_to_remove = [] + trackers.each do |tracker_id, tracker| + # filter msgs t_wfid = tracker['wfid'] t_action = tracker['action'] @@ -62,83 +153,88 @@ next unless does_match?(message, tracker['conditions']) if tracker_id == 'on_error' || tracker_id == 'on_terminate' - fields = msg['workitem']['fields'] + fs = msg['workitem']['fields'] - next if m_action == 'error_intercepted' && fields['__error__'] - next if m_action == 'terminated' && (fields['__error__'] || fields['__terminate__']) + next if m_action == 'error_intercepted' && fs['__error__'] + next if m_action == 'terminated' && (fs['__error__'] || fs['__terminate__']) end - # prepare and emit/put 'reaction' message + # remove the message post-trigger? - m = Ruote.fulldup(tracker['msg']) + ids_to_remove << tracker_id if tracker['msg'].delete('_auto_remove') - action = m.delete('action') + # OK, have to pull the trigger (or alter the message) then - m['wfid'] = m_wfid if m['wfid'] == 'replace' - m['wfid'] ||= @context.wfidgen.generate - - m['workitem'] = msg['workitem'] if m['workitem'] == 'replace' - - if t_action == 'error_intercepted' - m['workitem']['fields']['__error__'] = m_error - elsif tracker_id == 'on_error' && m_action == 'error_intercepted' - m['workitem']['fields']['__error__'] = m_error - elsif tracker_id == 'on_terminate' && m_action == 'terminated' - m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid } + if pre && tracker['msg']['_alter'] + alter(m_wfid, m_error, m_action, msg, tracker) + else + trigger(m_wfid, m_error, m_action, msg, tracker) end + end - if m['variables'] == 'compile' - fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei']) - m['variables'] = fexp ? fexp.compile_variables : {} - end + remove(ids_to_remove, nil) + end - @context.storage.put_msg(action, m) + # Alters the msg, only called in "pre" mode. + # + def alter(m_wfid, m_error, m_action, msg, tracker) + + case tracker['msg'].delete('_alter') + when 'merge' then msg.merge!(tracker['msg']) + #else ... end end - # Adds a tracker (usually when a 'listen' expression gets applied). + # Prepares the message that gets placed on the ruote msg queue. # - def add_tracker(wfid, action, id, conditions, msg) + def trigger(m_wfid, m_error, m_action, msg, tracker) - conditions = - conditions && conditions.remap { |(k, v), h| h[k] = Array(v) } + t_action = tracker['action'] + tracker_id = tracker['id'] - doc = @context.storage.get_trackers + m = Ruote.fulldup(tracker['msg']) - doc['trackers'][id] = - { 'wfid' => wfid, - 'action' => action, - 'id' => id, - 'conditions' => conditions, - 'msg' => msg } + action = m.delete('action') - r = @context.storage.put(doc) + m['wfid'] = m_wfid if m['wfid'] == 'replace' + m['wfid'] ||= @context.wfidgen.generate - add_tracker(wfid, action, id, conditions, msg) if r - # the put failed, have to redo the work - end + m['workitem'] = msg['workitem'] if m['workitem'] == 'replace' - # Removes a tracker (usually when a 'listen' expression replies to its - # parent expression or is cancelled). - # - def remove_tracker(fei, doc=nil) + if t_action == 'error_intercepted' + m['workitem']['fields']['__error__'] = m_error + elsif tracker_id == 'on_error' && m_action == 'error_intercepted' + m['workitem']['fields']['__error__'] = m_error + elsif tracker_id == 'on_terminate' && m_action == 'terminated' + m['workitem']['fields']['__terminate__'] = { 'wfid' => m_wfid } + end - doc ||= @context.storage.get_trackers + if m['variables'] == 'compile' + fexp = Ruote::Exp::FlowExpression.fetch(@context, msg['fei']) + m['variables'] = fexp ? fexp.compile_variables : {} + end - doc['trackers'].delete(Ruote.to_storage_id(fei)) + @context.storage.put_msg(action, m) + end - r = @context.storage.put(doc) + # Returns the trackers currently registered. + # + # Note: this is called from on_pre_msg and on_msg, hence two times + # for a single msg. We trust the storage implementation to cache it + # for us. + # + def trackers - remove_tracker(fei, r) if r - # the put failed, have to redo the work + @context.storage.get_trackers['trackers'] end - protected - + # Given a msg and a hash of conditions, returns true if the msg + # matches the conditions. + # def does_match?(msg, conditions) return true unless conditions conditions.each do |k, v| @@ -149,15 +245,17 @@ # track conditions are already stored as arrays. vv = Ruote.regex_or_s(vv) val = case k + when 'class' then msg['error']['class'] when 'message' then msg['error']['message'] - else msg[k] + + else Ruote.lookup(msg, k) end - val && (vv.is_a?(String) ? (vv == val) : vv.match(val)) + val && (vv.is_a?(Regexp) ? vv.match(val) : vv == val) end end true end