lib/ruote/svc/dispatch_pool.rb in ruote-2.1.11 vs lib/ruote/svc/dispatch_pool.rb in ruote-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -32,85 +32,112 @@ # Can be extended/replaced for better handling of Thread (why not something # like a thread pool or no threads at all). # class DispatchPool - def initialize (context) + def initialize(context) @context = context end - def handle (msg) + def handle(msg) case msg['action'] when 'dispatch' dispatch(msg) when 'dispatch_cancel' dispatch_cancel(msg) else - # simply discard message + # simply discard the message end end protected - def dispatch_cancel (msg) + def dispatch_cancel(msg) flavour = msg['flavour'] participant = @context.plist.instantiate(msg['participant']) begin participant.cancel(Ruote::FlowExpressionId.new(msg['fei']), flavour) - rescue Exception => e + rescue => e raise(e) if flavour != 'kill' end @context.storage.put_msg( 'reply', 'fei' => msg['fei'], 'workitem' => msg['workitem']) end - def dispatch (msg) + def dispatch(msg) participant = @context.plist.lookup( msg['participant'] || msg['participant_name'], msg['workitem']) - if participant.respond_to?(:do_not_thread) && participant.do_not_thread + if do_not_thread(participant, msg) do_dispatch(participant, msg) else do_threaded_dispatch(participant, msg) end end - def do_dispatch (participant, msg) + # The actual dispatching (call to Participant#consume). + # + def do_dispatch(participant, msg) workitem = Ruote::Workitem.new(msg['workitem']) workitem.fields['dispatched_at'] = Ruote.now_to_utc_s participant.consume(workitem) - @context.storage.put_msg('dispatched', 'fei' => msg['fei']) - # once the consume is done, asynchronously flag the - # participant expression as 'dispatched' + @context.storage.put_msg( + 'dispatched', + 'fei' => msg['fei'], + 'participant_name' => workitem.participant_name) + # once the consume is done, asynchronously flag the + # participant expression as 'dispatched' end - def do_threaded_dispatch (participant, msg) + # Wraps the call to do_dispatch in a thread. + # + def do_threaded_dispatch(participant, msg) + msg = Rufus::Json.dup(msg) + # + # the thread gets its own copy of the message + # (especially important if the main thread does something with + # the message 'during' the dispatch) + # Maybe at some point a limit on the number of dispatch threads # would be OK. # Or maybe it's the job of an extension / subclass Thread.new do begin do_dispatch(participant, msg) - rescue Exception => exception + rescue => exception @context.error_handler.msg_handle(msg, exception) end + end + end + + # Returns true if the participant doesn't want the #consume to happen + # in a new Thread. + # + def do_not_thread(participant, msg) + + return false unless participant.respond_to?(:do_not_thread) + + if participant.method(:do_not_thread).arity == 0 + participant.do_not_thread + else + participant.do_not_thread(Ruote::Workitem.new(msg['workitem'])) end end end end