lib/ruote/part/dispatch_pool.rb in ruote-2.1.8 vs lib/ruote/part/dispatch_pool.rb in ruote-2.1.9
- old
+ new
@@ -37,10 +37,42 @@
def initialize (context)
@context = context
end
+ def handle (msg)
+
+ case msg['action']
+ when 'dispatch'
+ dispatch(msg)
+ when 'dispatch_cancel'
+ dispatch_cancel(msg)
+ else
+ # simply discard message
+ end
+ end
+
+ protected
+
+ def dispatch_cancel (msg)
+
+ flavour = msg['flavour']
+
+ participant = @context.plist.lookup(msg['participant_name'])
+
+ begin
+ participant.cancel(Ruote::FlowExpressionId.new(msg['fei']), flavour)
+ rescue Exception => e
+ raise(e) if flavour != 'kill'
+ end
+
+ @context.storage.put_msg(
+ 'reply',
+ 'fei' => msg['fei'],
+ 'workitem' => msg['workitem'])
+ end
+
def dispatch (msg)
participant = @context.plist.lookup(msg['participant_name'])
if participant.respond_to?(:do_not_thread) && participant.do_not_thread
@@ -48,14 +80,14 @@
else
do_threaded_dispatch(participant, msg)
end
end
- protected
-
def do_dispatch (participant, msg)
workitem = Ruote::Workitem.new(msg['workitem'])
+
+ workitem.fields['dispatched_at'] = Ruote.now_to_utc_s
participant.consume(workitem)
end
def do_threaded_dispatch (participant, msg)