lib/ruote/svc/dispatch_pool.rb in ruote-2.3.0.1 vs lib/ruote/svc/dispatch_pool.rb in ruote-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -39,35 +39,40 @@ @context = context end def handle(msg) - case msg['action'] - when 'dispatch' then dispatch(msg) - when 'dispatch_cancel' then dispatch_cancel(msg) - when 'dispatch_pause', 'dispatch_resume' then dispatch_pause(msg) - else # simply discard the message - end + return unless msg['action'].match(/^dispatch/) + + send(msg['action'], msg) end protected + # Dispatching the msg. + # def dispatch(msg) participant = @context.plist.lookup( msg['participant'] || msg['participant_name'], msg['workitem']) - if (@context['participant_threads_enabled'] == false) || do_not_thread?(participant, msg) + if + @context['participant_threads_enabled'] == false || + do_not_thread?(participant, msg) + then do_dispatch(participant, msg) else do_threaded_dispatch(participant, msg) end end # The actual dispatching (call to Participant#consume or #on_workitem). # - def do_dispatch(participant, msg) + # No error rescuing so it might be interesting for some extension + # classes (like in ruote-swf). + # + def do_raw_dispatch(participant, msg) workitem = Ruote::Workitem.new(msg['workitem']) workitem.fields['dispatched_at'] = Ruote.now_to_utc_s @@ -81,10 +86,20 @@ 'workitem' => msg['workitem']) # once the consume is done, asynchronously flag the # participant expression as 'dispatched' end + # The raw dispatch work, wrapped in error handling. + # + def do_dispatch(participant, msg) + + do_raw_dispatch(participant, msg) + + rescue => err + @context.error_handler.msg_handle(msg, err) + end + # Wraps the call to do_dispatch in a thread. # def do_threaded_dispatch(participant, msg) msg = Rufus::Json.dup(msg) @@ -95,19 +110,11 @@ # Maybe at some point a limit on the number of dispatch threads # would be OK. # Or maybe it's the job of an extension / subclass - Thread.new do - begin - - do_dispatch(participant, msg) - - rescue => exception - @context.error_handler.msg_handle(msg, exception) - end - end + Thread.new { do_dispatch(participant, msg) } end # Returns true if the participant doesn't want the #consume to happen # in a new Thread. # @@ -163,9 +170,13 @@ Ruote.participant_send( participant, action, 'fei' => Ruote::FlowExpressionId.new(msg['fei']), :default => false) end + + # Route to dispatch_pause which handles both pause and resume. + # + alias dispatch_resume dispatch_pause end # Given a participant, a method name or an array of method names and # a hash of arguments, will do its best to set the instance variables # corresponding to the arguments (if possible) and to call the