lib/ruote/part/dispatch_pool.rb in ruote-2.1.9 vs lib/ruote/part/dispatch_pool.rb in ruote-2.1.10
- old
+ new
@@ -87,28 +87,28 @@
workitem = Ruote::Workitem.new(msg['workitem'])
workitem.fields['dispatched_at'] = Ruote.now_to_utc_s
participant.consume(workitem)
+
+ @context.storage.put_msg('dispatched', 'fei' => msg['fei'])
+ # once the consume is done, asynchronously flag the
+ # participant expression as 'dispatched'
end
def do_threaded_dispatch (participant, msg)
+ # Maybe at some point a limit on the number of dispatch threads
+ # would be OK.
+ # Or maybe it's the job of an extension / subclass
+
Thread.new do
begin
do_dispatch(participant, msg)
- rescue Exception => e
-
- #puts '/' * 80
- #p e
- #puts '/' * 80
-
- @context.worker.handle_exception(
- msg,
- Ruote::Exp::FlowExpression.fetch(@context, msg['workitem']['fei']),
- e)
+ rescue Exception => exception
+ @context.error_handler.msg_handle(msg, exception)
end
end
end
end
end