lib/ruote/part/dispatch_pool.rb in ruote-2.1.8 vs lib/ruote/part/dispatch_pool.rb in ruote-2.1.9

- old
+ new

@@ -37,10 +37,42 @@ def initialize (context) @context = context end + def handle (msg) + + case msg['action'] + when 'dispatch' + dispatch(msg) + when 'dispatch_cancel' + dispatch_cancel(msg) + else + # simply discard message + end + end + + protected + + def dispatch_cancel (msg) + + flavour = msg['flavour'] + + participant = @context.plist.lookup(msg['participant_name']) + + begin + participant.cancel(Ruote::FlowExpressionId.new(msg['fei']), flavour) + rescue Exception => e + raise(e) if flavour != 'kill' + end + + @context.storage.put_msg( + 'reply', + 'fei' => msg['fei'], + 'workitem' => msg['workitem']) + end + def dispatch (msg) participant = @context.plist.lookup(msg['participant_name']) if participant.respond_to?(:do_not_thread) && participant.do_not_thread @@ -48,14 +80,14 @@ else do_threaded_dispatch(participant, msg) end end - protected - def do_dispatch (participant, msg) workitem = Ruote::Workitem.new(msg['workitem']) + + workitem.fields['dispatched_at'] = Ruote.now_to_utc_s participant.consume(workitem) end def do_threaded_dispatch (participant, msg)